Home | History | Annotate | Download | only in commands
      1 # -*- coding: utf-8 -*-
      2 # Copyright 2012 Google Inc. All Rights Reserved.
      3 #
      4 # Licensed under the Apache License, Version 2.0 (the "License");
      5 # you may not use this file except in compliance with the License.
      6 # You may obtain a copy of the License at
      7 #
      8 #     http://www.apache.org/licenses/LICENSE-2.0
      9 #
     10 # Unless required by applicable law or agreed to in writing, software
     11 # distributed under the License is distributed on an "AS IS" BASIS,
     12 # WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
     13 # See the License for the specific language governing permissions and
     14 # limitations under the License.
     15 """Contains the perfdiag gsutil command."""
     16 
     17 from __future__ import absolute_import
     18 
     19 import calendar
     20 from collections import defaultdict
     21 from collections import namedtuple
     22 import contextlib
     23 import cStringIO
     24 import datetime
     25 import httplib
     26 import json
     27 import logging
     28 import math
     29 import multiprocessing
     30 import os
     31 import random
     32 import re
     33 import socket
     34 import string
     35 import subprocess
     36 import tempfile
     37 import time
     38 
     39 import boto
     40 import boto.gs.connection
     41 
     42 import gslib
     43 from gslib.cloud_api import NotFoundException
     44 from gslib.cloud_api import ServiceException
     45 from gslib.cloud_api_helper import GetDownloadSerializationData
     46 from gslib.command import Command
     47 from gslib.command import DummyArgChecker
     48 from gslib.command_argument import CommandArgument
     49 from gslib.commands import config
     50 from gslib.cs_api_map import ApiSelector
     51 from gslib.exception import CommandException
     52 from gslib.file_part import FilePart
     53 from gslib.hashing_helper import CalculateB64EncodedMd5FromContents
     54 from gslib.storage_url import StorageUrlFromString
     55 from gslib.third_party.storage_apitools import storage_v1_messages as apitools_messages
     56 from gslib.util import CheckFreeSpace
     57 from gslib.util import DivideAndCeil
     58 from gslib.util import GetCloudApiInstance
     59 from gslib.util import GetFileSize
     60 from gslib.util import GetMaxRetryDelay
     61 from gslib.util import HumanReadableToBytes
     62 from gslib.util import IS_LINUX
     63 from gslib.util import MakeBitsHumanReadable
     64 from gslib.util import MakeHumanReadable
     65 from gslib.util import Percentile
     66 from gslib.util import ResumableThreshold
     67 
     68 _SYNOPSIS = """
     69   gsutil perfdiag [-i in.json]
     70   gsutil perfdiag [-o out.json] [-n objects] [-c processes]
     71       [-k threads] [-p parallelism type] [-y slices] [-s size] [-d directory]
     72       [-t tests] url...
     73 """
     74 
     75 _DETAILED_HELP_TEXT = ("""
     76 <B>SYNOPSIS</B>
     77 """ + _SYNOPSIS + """
     78 
     79 
     80 <B>DESCRIPTION</B>
     81   The perfdiag command runs a suite of diagnostic tests for a given Google
     82   Storage bucket.
     83 
     84   The 'url' parameter must name an existing bucket (e.g. gs://foo) to which
     85   the user has write permission. Several test files will be uploaded to and
     86   downloaded from this bucket. All test files will be deleted at the completion
     87   of the diagnostic if it finishes successfully.
     88 
     89   gsutil performance can be impacted by many factors at the client, server,
     90   and in-between, such as: CPU speed; available memory; the access path to the
     91   local disk; network bandwidth; contention and error rates along the path
     92   between gsutil and Google; operating system buffering configuration; and
     93   firewalls and other network elements. The perfdiag command is provided so
     94   that customers can run a known measurement suite when troubleshooting
     95   performance problems.
     96 
     97 
     98 <B>PROVIDING DIAGNOSTIC OUTPUT TO GOOGLE CLOUD STORAGE TEAM</B>
     99   If the Google Cloud Storage Team asks you to run a performance diagnostic
    100   please use the following command, and email the output file (output.json)
    101   to gs-team (at] google.com:
    102 
    103     gsutil perfdiag -o output.json gs://your-bucket
    104 
    105 
    106 <B>OPTIONS</B>
    107   -n          Sets the number of objects to use when downloading and uploading
    108               files during tests. Defaults to 5.
    109 
    110   -c          Sets the number of processes to use while running throughput
    111               experiments. The default value is 1.
    112 
    113   -k          Sets the number of threads per process to use while running
    114               throughput experiments. Each process will receive an equal number
    115               of threads. The default value is 1.
    116 
    117               Note: All specified threads and processes will be created, but may
    118               not by saturated with work if too few objects (specified with -n)
    119               and too few components (specified with -y) are specified.
    120 
    121   -p          Sets the type of parallelism to be used (only applicable when
    122               threads or processes are specified and threads * processes > 1).
    123               The default is to use fan. Must be one of the following:
    124 
    125               fan
    126                  Use one thread per object. This is akin to using gsutil -m cp,
    127                  with sliced object download / parallel composite upload
    128                  disabled.
    129 
    130               slice
    131                  Use Y (specified with -y) threads for each object, transferring
    132                  one object at a time. This is akin to using parallel object
    133                  download / parallel composite upload, without -m. Sliced
    134                  uploads not supported for s3.
    135 
    136               both
    137                  Use Y (specified with -y) threads for each object, transferring
    138                  multiple objects at a time. This is akin to simultaneously
    139                  using sliced object download / parallel composite upload and
    140                  gsutil -m cp. Sliced uploads not supported for s3.
    141 
    142   -y          Sets the number of slices to divide each file/object into while
    143               transferring data. Only applicable with the slice (or both)
    144               parallelism type. The default is 4 slices.
    145 
    146   -s          Sets the size (in bytes) for each of the N (set with -n) objects
    147               used in the read and write throughput tests. The default is 1 MiB.
    148               This can also be specified using byte suffixes such as 500K or 1M.
    149               Note: these values are interpreted as multiples of 1024 (K=1024,
    150               M=1024*1024, etc.)
    151               Note: If rthru_file or wthru_file are performed, N (set with -n)
    152               times as much disk space as specified will be required for the
    153               operation.
    154 
    155   -d          Sets the directory to store temporary local files in. If not
    156               specified, a default temporary directory will be used.
    157 
    158   -t          Sets the list of diagnostic tests to perform. The default is to
    159               run the lat, rthru, and wthru diagnostic tests. Must be a
    160               comma-separated list containing one or more of the following:
    161 
    162               lat
    163                  For N (set with -n) objects, write the object, retrieve its
    164                  metadata, read the object, and finally delete the object.
    165                  Record the latency of each operation.
    166 
    167               list
    168                  Write N (set with -n) objects to the bucket, record how long
    169                  it takes for the eventually consistent listing call to return
    170                  the N objects in its result, delete the N objects, then record
    171                  how long it takes listing to stop returning the N objects.
    172                  This test is off by default.
    173 
    174               rthru
    175                  Runs N (set with -n) read operations, with at most C
    176                  (set with -c) reads outstanding at any given time.
    177 
    178               rthru_file
    179                  The same as rthru, but simultaneously writes data to the disk,
    180                  to gauge the performance impact of the local disk on downloads.
    181 
    182               wthru
    183                  Runs N (set with -n) write operations, with at most C
    184                  (set with -c) writes outstanding at any given time.
    185 
    186               wthru_file
    187                  The same as wthru, but simultaneously reads data from the disk,
    188                  to gauge the performance impact of the local disk on uploads.
    189 
    190   -m          Adds metadata to the result JSON file. Multiple -m values can be
    191               specified. Example:
    192 
    193                   gsutil perfdiag -m "key1:val1" -m "key2:val2" gs://bucketname
    194 
    195               Each metadata key will be added to the top-level "metadata"
    196               dictionary in the output JSON file.
    197 
    198   -o          Writes the results of the diagnostic to an output file. The output
    199               is a JSON file containing system information and performance
    200               diagnostic results. The file can be read and reported later using
    201               the -i option.
    202 
    203   -i          Reads the JSON output file created using the -o command and prints
    204               a formatted description of the results.
    205 
    206 
    207 <B>MEASURING AVAILABILITY</B>
    208   The perfdiag command ignores the boto num_retries configuration parameter.
    209   Instead, it always retries on HTTP errors in the 500 range and keeps track of
    210   how many 500 errors were encountered during the test. The availability
    211   measurement is reported at the end of the test.
    212 
    213   Note that HTTP responses are only recorded when the request was made in a
    214   single process. When using multiple processes or threads, read and write
    215   throughput measurements are performed in an external process, so the
    216   availability numbers reported won't include the throughput measurements.
    217 
    218 
    219 <B>NOTE</B>
    220   The perfdiag command collects system information. It collects your IP address,
    221   executes DNS queries to Google servers and collects the results, and collects
    222   network statistics information from the output of netstat -s. It will also
    223   attempt to connect to your proxy server if you have one configured. None of
    224   this information will be sent to Google unless you choose to send it.
    225 """)
    226 
    227 FileDataTuple = namedtuple(
    228     'FileDataTuple',
    229     'size md5 data')
    230 
    231 # Describes one object in a fanned download. If need_to_slice is specified as
    232 # True, the object should be downloaded with the slice strategy. Other field
    233 # names are the same as documented in PerfDiagCommand.Download.
    234 FanDownloadTuple = namedtuple(
    235     'FanDownloadTuple',
    236     'need_to_slice object_name file_name serialization_data')
    237 
    238 # Describes one slice in a sliced download.
    239 # Field names are the same as documented in PerfDiagCommand.Download.
    240 SliceDownloadTuple = namedtuple(
    241     'SliceDownloadTuple',
    242     'object_name file_name serialization_data start_byte end_byte')
    243 
    244 # Describes one file in a fanned upload. If need_to_slice is specified as
    245 # True, the file should be uploaded with the slice strategy. Other field
    246 # names are the same as documented in PerfDiagCommand.Upload.
    247 FanUploadTuple = namedtuple(
    248     'FanUploadTuple',
    249     'need_to_slice file_name object_name use_file')
    250 
    251 # Describes one slice in a sliced upload.
    252 # Field names are the same as documented in PerfDiagCommand.Upload.
    253 SliceUploadTuple = namedtuple(
    254     'SliceUploadTuple',
    255     'file_name object_name use_file file_start file_size')
    256 
    257 # Dict storing file_path:FileDataTuple for each temporary file used by
    258 # perfdiag. This data should be kept outside of the PerfDiagCommand class
    259 # since calls to Apply will make copies of all member data.
    260 temp_file_dict = {}
    261 
    262 
    263 class Error(Exception):
    264   """Base exception class for this module."""
    265   pass
    266 
    267 
    268 class InvalidArgument(Error):
    269   """Raised on invalid arguments to functions."""
    270   pass
    271 
    272 
    273 def _DownloadObject(cls, args, thread_state=None):
    274   """Function argument to apply for performing fanned parallel downloads.
    275 
    276   Args:
    277     cls: The calling PerfDiagCommand class instance.
    278     args: A FanDownloadTuple object describing this download.
    279     thread_state: gsutil Cloud API instance to use for the operation.
    280   """
    281   cls.gsutil_api = GetCloudApiInstance(cls, thread_state)
    282   if args.need_to_slice:
    283     cls.PerformSlicedDownload(args.object_name, args.file_name,
    284                               args.serialization_data)
    285   else:
    286     cls.Download(args.object_name, args.file_name, args.serialization_data)
    287 
    288 
    289 def _DownloadSlice(cls, args, thread_state=None):
    290   """Function argument to apply for performing sliced downloads.
    291 
    292   Args:
    293     cls: The calling PerfDiagCommand class instance.
    294     args: A SliceDownloadTuple object describing this download.
    295     thread_state: gsutil Cloud API instance to use for the operation.
    296   """
    297   cls.gsutil_api = GetCloudApiInstance(cls, thread_state)
    298   cls.Download(args.object_name, args.file_name, args.serialization_data,
    299                args.start_byte, args.end_byte)
    300 
    301 
    302 def _UploadObject(cls, args, thread_state=None):
    303   """Function argument to apply for performing fanned parallel uploads.
    304 
    305   Args:
    306     cls: The calling PerfDiagCommand class instance.
    307     args: A FanUploadTuple object describing this upload.
    308     thread_state: gsutil Cloud API instance to use for the operation.
    309   """
    310   cls.gsutil_api = GetCloudApiInstance(cls, thread_state)
    311   if args.need_to_slice:
    312     cls.PerformSlicedUpload(args.file_name, args.object_name, args.use_file)
    313   else:
    314     cls.Upload(args.file_name, args.object_name, args.use_file)
    315 
    316 
    317 def _UploadSlice(cls, args, thread_state=None):
    318   """Function argument to apply for performing sliced parallel uploads.
    319 
    320   Args:
    321     cls: The calling PerfDiagCommand class instance.
    322     args: A SliceUploadTuple object describing this upload.
    323     thread_state: gsutil Cloud API instance to use for the operation.
    324   """
    325   cls.gsutil_api = GetCloudApiInstance(cls, thread_state)
    326   cls.Upload(args.file_name, args.object_name, args.use_file,
    327              args.file_start, args.file_size)
    328 
    329 
    330 def _DeleteWrapper(cls, object_name, thread_state=None):
    331   """Function argument to apply for performing parallel object deletions.
    332 
    333   Args:
    334     cls: The calling PerfDiagCommand class instance.
    335     object_name: The object name to delete from the test bucket.
    336     thread_state: gsutil Cloud API instance to use for the operation.
    337   """
    338   cls.gsutil_api = GetCloudApiInstance(cls, thread_state)
    339   cls.Delete(object_name)
    340 
    341 
    342 def _PerfdiagExceptionHandler(cls, e):
    343   """Simple exception handler to allow post-completion status."""
    344   cls.logger.error(str(e))
    345 
    346 
    347 def _DummyTrackerCallback(_):
    348   pass
    349 
    350 
    351 class DummyFile(object):
    352   """A dummy, file-like object that throws away everything written to it."""
    353 
    354   def write(self, *args, **kwargs):  # pylint: disable=invalid-name
    355     pass
    356 
    357   def close(self):  # pylint: disable=invalid-name
    358     pass
    359 
    360 
    361 # Many functions in perfdiag re-define a temporary function based on a
    362 # variable from a loop, resulting in a false positive from the linter.
    363 # pylint: disable=cell-var-from-loop
    364 class PerfDiagCommand(Command):
    365   """Implementation of gsutil perfdiag command."""
    366 
    367   # Command specification. See base class for documentation.
    368   command_spec = Command.CreateCommandSpec(
    369       'perfdiag',
    370       command_name_aliases=['diag', 'diagnostic', 'perf', 'performance'],
    371       usage_synopsis=_SYNOPSIS,
    372       min_args=0,
    373       max_args=1,
    374       supported_sub_args='n:c:k:p:y:s:d:t:m:i:o:',
    375       file_url_ok=False,
    376       provider_url_ok=False,
    377       urls_start_arg=0,
    378       gs_api_support=[ApiSelector.XML, ApiSelector.JSON],
    379       gs_default_api=ApiSelector.JSON,
    380       argparse_arguments=[
    381           CommandArgument.MakeNCloudBucketURLsArgument(1)
    382       ]
    383   )
    384   # Help specification. See help_provider.py for documentation.
    385   help_spec = Command.HelpSpec(
    386       help_name='perfdiag',
    387       help_name_aliases=[],
    388       help_type='command_help',
    389       help_one_line_summary='Run performance diagnostic',
    390       help_text=_DETAILED_HELP_TEXT,
    391       subcommand_help_text={},
    392   )
    393 
    394   # Byte sizes to use for latency testing files.
    395   # TODO: Consider letting the user specify these sizes with a configuration
    396   # parameter.
    397   test_lat_file_sizes = (
    398       0,  # 0 bytes
    399       1024,  # 1 KiB
    400       102400,  # 100 KiB
    401       1048576,  # 1 MiB
    402   )
    403 
    404   # Test names.
    405   RTHRU = 'rthru'
    406   RTHRU_FILE = 'rthru_file'
    407   WTHRU = 'wthru'
    408   WTHRU_FILE = 'wthru_file'
    409   LAT = 'lat'
    410   LIST = 'list'
    411 
    412   # Parallelism strategies.
    413   FAN = 'fan'
    414   SLICE = 'slice'
    415   BOTH = 'both'
    416 
    417   # List of all diagnostic tests.
    418   ALL_DIAG_TESTS = (RTHRU, RTHRU_FILE, WTHRU, WTHRU_FILE, LAT, LIST)
    419 
    420   # List of diagnostic tests to run by default.
    421   DEFAULT_DIAG_TESTS = (RTHRU, WTHRU, LAT)
    422 
    423   # List of parallelism strategies.
    424   PARALLEL_STRATEGIES = (FAN, SLICE, BOTH)
    425 
    426   # Google Cloud Storage XML API endpoint host.
    427   XML_API_HOST = boto.config.get(
    428       'Credentials', 'gs_host', boto.gs.connection.GSConnection.DefaultHost)
    429   # Google Cloud Storage XML API endpoint port.
    430   XML_API_PORT = boto.config.get('Credentials', 'gs_port', 80)
    431 
    432   # Maximum number of times to retry requests on 5xx errors.
    433   MAX_SERVER_ERROR_RETRIES = 5
    434   # Maximum number of times to retry requests on more serious errors like
    435   # the socket breaking.
    436   MAX_TOTAL_RETRIES = 10
    437 
    438   # The default buffer size in boto's Key object is set to 8 KiB. This becomes a
    439   # bottleneck at high throughput rates, so we increase it.
    440   KEY_BUFFER_SIZE = 16384
    441 
    442   # The maximum number of bytes to generate pseudo-randomly before beginning
    443   # to repeat bytes. This number was chosen as the next prime larger than 5 MiB.
    444   MAX_UNIQUE_RANDOM_BYTES = 5242883
    445 
    446   # Maximum amount of time, in seconds, we will wait for object listings to
    447   # reflect what we expect in the listing tests.
    448   MAX_LISTING_WAIT_TIME = 60.0
    449 
    450   def _Exec(self, cmd, raise_on_error=True, return_output=False,
    451             mute_stderr=False):
    452     """Executes a command in a subprocess.
    453 
    454     Args:
    455       cmd: List containing the command to execute.
    456       raise_on_error: Whether or not to raise an exception when a process exits
    457           with a non-zero return code.
    458       return_output: If set to True, the return value of the function is the
    459           stdout of the process.
    460       mute_stderr: If set to True, the stderr of the process is not printed to
    461           the console.
    462 
    463     Returns:
    464       The return code of the process or the stdout if return_output is set.
    465 
    466     Raises:
    467       Exception: If raise_on_error is set to True and any process exits with a
    468       non-zero return code.
    469     """
    470     self.logger.debug('Running command: %s', cmd)
    471     stderr = subprocess.PIPE if mute_stderr else None
    472     p = subprocess.Popen(cmd, stdout=subprocess.PIPE, stderr=stderr)
    473     (stdoutdata, _) = p.communicate()
    474     if raise_on_error and p.returncode:
    475       raise CommandException("Received non-zero return code (%d) from "
    476                              "subprocess '%s'." % (p.returncode, ' '.join(cmd)))
    477     return stdoutdata if return_output else p.returncode
    478 
    479   def _WarnIfLargeData(self):
    480     """Outputs a warning message if a large amount of data is being used."""
    481     if self.num_objects * self.thru_filesize > HumanReadableToBytes('2GiB'):
    482       self.logger.info('This is a large operation, and could take a while.')
    483 
    484   def _MakeTempFile(self, file_size=0, mem_metadata=False,
    485                     mem_data=False, prefix='gsutil_test_file'):
    486     """Creates a temporary file of the given size and returns its path.
    487 
    488     Args:
    489       file_size: The size of the temporary file to create.
    490       mem_metadata: If true, store md5 and file size in memory at
    491                     temp_file_dict[fpath].md5, tempfile_data[fpath].file_size.
    492       mem_data: If true, store the file data in memory at
    493                 temp_file_dict[fpath].data
    494       prefix: The prefix to use for the temporary file. Defaults to
    495               gsutil_test_file.
    496 
    497     Returns:
    498       The file path of the created temporary file.
    499     """
    500     fd, fpath = tempfile.mkstemp(suffix='.bin', prefix=prefix,
    501                                  dir=self.directory, text=False)
    502     with os.fdopen(fd, 'wb') as fp:
    503       random_bytes = os.urandom(min(file_size,
    504                                     self.MAX_UNIQUE_RANDOM_BYTES))
    505       total_bytes_written = 0
    506       while total_bytes_written < file_size:
    507         num_bytes = min(self.MAX_UNIQUE_RANDOM_BYTES,
    508                         file_size - total_bytes_written)
    509         fp.write(random_bytes[:num_bytes])
    510         total_bytes_written += num_bytes
    511 
    512     if mem_metadata or mem_data:
    513       with open(fpath, 'rb') as fp:
    514         file_size = GetFileSize(fp) if mem_metadata else None
    515         md5 = CalculateB64EncodedMd5FromContents(fp) if mem_metadata else None
    516         data = fp.read() if mem_data else None
    517         temp_file_dict[fpath] = FileDataTuple(file_size, md5, data)
    518 
    519     self.temporary_files.add(fpath)
    520     return fpath
    521 
    522   def _SetUp(self):
    523     """Performs setup operations needed before diagnostics can be run."""
    524 
    525     # Stores test result data.
    526     self.results = {}
    527     # Set of file paths for local temporary files.
    528     self.temporary_files = set()
    529     # Set of names for test objects that exist in the test bucket.
    530     self.temporary_objects = set()
    531     # Total number of HTTP requests made.
    532     self.total_requests = 0
    533     # Total number of HTTP 5xx errors.
    534     self.request_errors = 0
    535     # Number of responses, keyed by response code.
    536     self.error_responses_by_code = defaultdict(int)
    537     # Total number of socket errors.
    538     self.connection_breaks = 0
    539     # Boolean to prevent doing cleanup twice.
    540     self.teardown_completed = False
    541 
    542     # Create files for latency test.
    543     if self.LAT in self.diag_tests:
    544       self.latency_files = []
    545       for file_size in self.test_lat_file_sizes:
    546         fpath = self._MakeTempFile(file_size, mem_metadata=True, mem_data=True)
    547         self.latency_files.append(fpath)
    548 
    549     # Create files for throughput tests.
    550     if self.diag_tests.intersection(
    551         (self.RTHRU, self.WTHRU, self.RTHRU_FILE, self.WTHRU_FILE)):
    552       # Create a file for warming up the TCP connection.
    553       self.tcp_warmup_file = self._MakeTempFile(
    554           5 * 1024 * 1024, mem_metadata=True, mem_data=True)
    555 
    556       # For in memory tests, throughput tests transfer the same object N times
    557       # instead of creating N objects, in order to avoid excessive memory usage.
    558       if self.diag_tests.intersection((self.RTHRU, self.WTHRU)):
    559         self.mem_thru_file_name = self._MakeTempFile(
    560             self.thru_filesize, mem_metadata=True, mem_data=True)
    561         self.mem_thru_object_name = os.path.basename(self.mem_thru_file_name)
    562 
    563       # For tests that use disk I/O, it is necessary to create N objects in
    564       # in order to properly measure the performance impact of seeks.
    565       if self.diag_tests.intersection((self.RTHRU_FILE, self.WTHRU_FILE)):
    566         # List of file names and corresponding object names to use for file
    567         # throughput tests.
    568         self.thru_file_names = []
    569         self.thru_object_names = []
    570 
    571         free_disk_space = CheckFreeSpace(self.directory)
    572         if free_disk_space >= self.thru_filesize * self.num_objects:
    573           self.logger.info('\nCreating %d local files each of size %s.'
    574                            % (self.num_objects,
    575                               MakeHumanReadable(self.thru_filesize)))
    576           self._WarnIfLargeData()
    577           for _ in range(self.num_objects):
    578             file_name = self._MakeTempFile(self.thru_filesize,
    579                                            mem_metadata=True)
    580             self.thru_file_names.append(file_name)
    581             self.thru_object_names.append(os.path.basename(file_name))
    582         else:
    583           raise CommandException(
    584               'Not enough free disk space for throughput files: '
    585               '%s of disk space required, but only %s available.'
    586               % (MakeHumanReadable(self.thru_filesize * self.num_objects),
    587                  MakeHumanReadable(free_disk_space)))
    588 
    589     # Dummy file buffer to use for downloading that goes nowhere.
    590     self.discard_sink = DummyFile()
    591 
    592     # Filter out misleading progress callback output and the incorrect
    593     # suggestion to use gsutil -m perfdiag.
    594     self.logger.addFilter(self._PerfdiagFilter())
    595 
    596   def _TearDown(self):
    597     """Performs operations to clean things up after performing diagnostics."""
    598     if not self.teardown_completed:
    599       temp_file_dict.clear()
    600 
    601       try:
    602         for fpath in self.temporary_files:
    603           os.remove(fpath)
    604         if self.delete_directory:
    605           os.rmdir(self.directory)
    606       except OSError:
    607         pass
    608 
    609       if self.threads > 1 or self.processes > 1:
    610         args = [obj for obj in self.temporary_objects]
    611         self.Apply(_DeleteWrapper, args, _PerfdiagExceptionHandler,
    612                    arg_checker=DummyArgChecker,
    613                    parallel_operations_override=True,
    614                    process_count=self.processes, thread_count=self.threads)
    615       else:
    616         for object_name in self.temporary_objects:
    617           self.Delete(object_name)
    618     self.teardown_completed = True
    619 
    620   @contextlib.contextmanager
    621   def _Time(self, key, bucket):
    622     """A context manager that measures time.
    623 
    624     A context manager that prints a status message before and after executing
    625     the inner command and times how long the inner command takes. Keeps track of
    626     the timing, aggregated by the given key.
    627 
    628     Args:
    629       key: The key to insert the timing value into a dictionary bucket.
    630       bucket: A dictionary to place the timing value in.
    631 
    632     Yields:
    633       For the context manager.
    634     """
    635     self.logger.info('%s starting...', key)
    636     t0 = time.time()
    637     yield
    638     t1 = time.time()
    639     bucket[key].append(t1 - t0)
    640     self.logger.info('%s done.', key)
    641 
    642   def _RunOperation(self, func):
    643     """Runs an operation with retry logic.
    644 
    645     Args:
    646       func: The function to run.
    647 
    648     Returns:
    649       True if the operation succeeds, False if aborted.
    650     """
    651     # We retry on httplib exceptions that can happen if the socket was closed
    652     # by the remote party or the connection broke because of network issues.
    653     # Only the BotoServerError is counted as a 5xx error towards the retry
    654     # limit.
    655     success = False
    656     server_error_retried = 0
    657     total_retried = 0
    658     i = 0
    659     return_val = None
    660     while not success:
    661       next_sleep = min(random.random() * (2 ** i) + 1, GetMaxRetryDelay())
    662       try:
    663         return_val = func()
    664         self.total_requests += 1
    665         success = True
    666       except tuple(self.exceptions) as e:
    667         total_retried += 1
    668         if total_retried > self.MAX_TOTAL_RETRIES:
    669           self.logger.info('Reached maximum total retries. Not retrying.')
    670           break
    671         if isinstance(e, ServiceException):
    672           if e.status >= 500:
    673             self.error_responses_by_code[e.status] += 1
    674             self.total_requests += 1
    675             self.request_errors += 1
    676             server_error_retried += 1
    677             time.sleep(next_sleep)
    678           else:
    679             raise
    680           if server_error_retried > self.MAX_SERVER_ERROR_RETRIES:
    681             self.logger.info(
    682                 'Reached maximum server error retries. Not retrying.')
    683             break
    684         else:
    685           self.connection_breaks += 1
    686     return return_val
    687 
    688   def _RunLatencyTests(self):
    689     """Runs latency tests."""
    690     # Stores timing information for each category of operation.
    691     self.results['latency'] = defaultdict(list)
    692 
    693     for i in range(self.num_objects):
    694       self.logger.info('\nRunning latency iteration %d...', i+1)
    695       for fpath in self.latency_files:
    696         file_data = temp_file_dict[fpath]
    697         url = self.bucket_url.Clone()
    698         url.object_name = os.path.basename(fpath)
    699         file_size = file_data.size
    700         readable_file_size = MakeHumanReadable(file_size)
    701 
    702         self.logger.info(
    703             "\nFile of size %s located on disk at '%s' being diagnosed in the "
    704             "cloud at '%s'.", readable_file_size, fpath, url)
    705 
    706         upload_target = StorageUrlToUploadObjectMetadata(url)
    707 
    708         def _Upload():
    709           io_fp = cStringIO.StringIO(file_data.data)
    710           with self._Time('UPLOAD_%d' % file_size, self.results['latency']):
    711             self.gsutil_api.UploadObject(
    712                 io_fp, upload_target, size=file_size, provider=self.provider,
    713                 fields=['name'])
    714         self._RunOperation(_Upload)
    715 
    716         def _Metadata():
    717           with self._Time('METADATA_%d' % file_size, self.results['latency']):
    718             return self.gsutil_api.GetObjectMetadata(
    719                 url.bucket_name, url.object_name,
    720                 provider=self.provider, fields=['name', 'contentType',
    721                                                 'mediaLink', 'size'])
    722         # Download will get the metadata first if we don't pass it in.
    723         download_metadata = self._RunOperation(_Metadata)
    724         serialization_data = GetDownloadSerializationData(download_metadata)
    725 
    726         def _Download():
    727           with self._Time('DOWNLOAD_%d' % file_size, self.results['latency']):
    728             self.gsutil_api.GetObjectMedia(
    729                 url.bucket_name, url.object_name, self.discard_sink,
    730                 provider=self.provider, serialization_data=serialization_data)
    731         self._RunOperation(_Download)
    732 
    733         def _Delete():
    734           with self._Time('DELETE_%d' % file_size, self.results['latency']):
    735             self.gsutil_api.DeleteObject(url.bucket_name, url.object_name,
    736                                          provider=self.provider)
    737         self._RunOperation(_Delete)
    738 
    739   class _PerfdiagFilter(logging.Filter):
    740 
    741     def filter(self, record):
    742       # Used to prevent unnecessary output when using multiprocessing.
    743       msg = record.getMessage()
    744       return not (('Copying file:///' in msg) or ('Copying gs://' in msg) or
    745                   ('Computing CRC' in msg) or ('gsutil -m perfdiag' in msg))
    746 
    747   def _PerfdiagExceptionHandler(self, e):
    748     """Simple exception handler to allow post-completion status."""
    749     self.logger.error(str(e))
    750 
    751   def PerformFannedDownload(self, need_to_slice, object_names, file_names,
    752                             serialization_data):
    753     """Performs a parallel download of multiple objects using the fan strategy.
    754 
    755     Args:
    756       need_to_slice: If True, additionally apply the slice strategy to each
    757                      object in object_names.
    758       object_names: A list of object names to be downloaded. Each object must
    759                     already exist in the test bucket.
    760       file_names: A list, corresponding by index to object_names, of file names
    761                   for downloaded data. If None, discard downloaded data.
    762       serialization_data: A list, corresponding by index to object_names,
    763                           of serialization data for each object.
    764     """
    765     args = []
    766     for i in range(len(object_names)):
    767       file_name = file_names[i] if file_names else None
    768       args.append(FanDownloadTuple(
    769           need_to_slice, object_names[i], file_name,
    770           serialization_data[i]))
    771     self.Apply(_DownloadObject, args, _PerfdiagExceptionHandler,
    772                ('total_requests', 'request_errors'),
    773                arg_checker=DummyArgChecker, parallel_operations_override=True,
    774                process_count=self.processes, thread_count=self.threads)
    775 
    776   def PerformSlicedDownload(self, object_name, file_name, serialization_data):
    777     """Performs a download of an object using the slice strategy.
    778 
    779     Args:
    780       object_name: The name of the object to download.
    781       file_name: The name of the file to download data to, or None if data
    782                  should be discarded.
    783       serialization_data: The serialization data for the object.
    784     """
    785     if file_name:
    786       with open(file_name, 'ab') as fp:
    787         fp.truncate(self.thru_filesize)
    788     component_size = DivideAndCeil(self.thru_filesize, self.num_slices)
    789     args = []
    790     for i in range(self.num_slices):
    791       start_byte = i * component_size
    792       end_byte = min((i + 1) * (component_size) - 1, self.thru_filesize - 1)
    793       args.append(SliceDownloadTuple(object_name, file_name, serialization_data,
    794                                      start_byte, end_byte))
    795     self.Apply(_DownloadSlice, args, _PerfdiagExceptionHandler,
    796                ('total_requests', 'request_errors'),
    797                arg_checker=DummyArgChecker, parallel_operations_override=True,
    798                process_count=self.processes, thread_count=self.threads)
    799 
    800   def PerformFannedUpload(self, need_to_slice, file_names, object_names,
    801                           use_file):
    802     """Performs a parallel upload of multiple files using the fan strategy.
    803 
    804     The metadata for file_name should be present in temp_file_dict prior
    805     to calling. Also, the data for file_name should be present in temp_file_dict
    806     if use_file is specified as False.
    807 
    808     Args:
    809       need_to_slice: If True, additionally apply the slice strategy to each
    810                      file in file_names.
    811       file_names: A list of file names to be uploaded.
    812       object_names: A list, corresponding by by index to file_names, of object
    813                     names to upload data to.
    814       use_file: If true, use disk I/O, otherwise read upload data from memory.
    815     """
    816     args = []
    817     for i in range(len(file_names)):
    818       args.append(FanUploadTuple(
    819           need_to_slice, file_names[i], object_names[i], use_file))
    820     self.Apply(_UploadObject, args, _PerfdiagExceptionHandler,
    821                ('total_requests', 'request_errors'),
    822                arg_checker=DummyArgChecker, parallel_operations_override=True,
    823                process_count=self.processes, thread_count=self.threads)
    824 
    825   def PerformSlicedUpload(self, file_name, object_name, use_file):
    826     """Performs a parallel upload of a file using the slice strategy.
    827 
    828     The metadata for file_name should be present in temp_file_dict prior
    829     to calling. Also, the data from for file_name should be present in
    830     temp_file_dict if use_file is specified as False.
    831 
    832     Args:
    833       file_name: The name of the file to upload.
    834       object_name: The name of the object to upload to.
    835       use_file: If true, use disk I/O, otherwise read upload data from memory.
    836     """
    837     # Divide the file into components.
    838     component_size = DivideAndCeil(self.thru_filesize, self.num_slices)
    839     component_object_names = (
    840         [object_name + str(i) for i in range(self.num_slices)])
    841 
    842     args = []
    843     for i in range(self.num_slices):
    844       component_start = i * component_size
    845       component_size = min(component_size,
    846                            temp_file_dict[file_name].size - component_start)
    847       args.append(SliceUploadTuple(file_name, component_object_names[i],
    848                                    use_file, component_start, component_size))
    849 
    850     # Upload the components in parallel.
    851     try:
    852       self.Apply(_UploadSlice, args, _PerfdiagExceptionHandler,
    853                  ('total_requests', 'request_errors'),
    854                  arg_checker=DummyArgChecker, parallel_operations_override=True,
    855                  process_count=self.processes, thread_count=self.threads)
    856 
    857       # Compose the components into an object.
    858       request_components = []
    859       for i in range(self.num_slices):
    860         src_obj_metadata = (
    861             apitools_messages.ComposeRequest.SourceObjectsValueListEntry(
    862                 name=component_object_names[i]))
    863         request_components.append(src_obj_metadata)
    864 
    865       dst_obj_metadata = apitools_messages.Object()
    866       dst_obj_metadata.name = object_name
    867       dst_obj_metadata.bucket = self.bucket_url.bucket_name
    868       def _Compose():
    869         self.gsutil_api.ComposeObject(request_components, dst_obj_metadata,
    870                                       provider=self.provider)
    871       self._RunOperation(_Compose)
    872     finally:
    873       # Delete the temporary components.
    874       self.Apply(_DeleteWrapper, component_object_names,
    875                  _PerfdiagExceptionHandler,
    876                  ('total_requests', 'request_errors'),
    877                  arg_checker=DummyArgChecker, parallel_operations_override=True,
    878                  process_count=self.processes, thread_count=self.threads)
    879 
    880   def _RunReadThruTests(self, use_file=False):
    881     """Runs read throughput tests."""
    882     test_name = 'read_throughput_file' if use_file else 'read_throughput'
    883     file_io_string = 'with file I/O' if use_file else ''
    884     self.logger.info(
    885         '\nRunning read throughput tests %s (%s objects of size %s)' %
    886         (file_io_string, self.num_objects,
    887          MakeHumanReadable(self.thru_filesize)))
    888     self._WarnIfLargeData()
    889 
    890     self.results[test_name] = {'file_size': self.thru_filesize,
    891                                'processes': self.processes,
    892                                'threads': self.threads,
    893                                'parallelism': self.parallel_strategy
    894                               }
    895 
    896     # Copy the file(s) to the test bucket, and also get the serialization data
    897     # so that we can pass it to download.
    898     if use_file:
    899       # For test with file I/O use N files on disk to preserve seek performance.
    900       file_names = self.thru_file_names
    901       object_names = self.thru_object_names
    902       serialization_data = []
    903       for i in range(self.num_objects):
    904         self.temporary_objects.add(self.thru_object_names[i])
    905         if self.WTHRU_FILE in self.diag_tests:
    906           # If we ran the WTHRU_FILE test, then the objects already exist.
    907           obj_metadata = self.gsutil_api.GetObjectMetadata(
    908               self.bucket_url.bucket_name, self.thru_object_names[i],
    909               fields=['size', 'mediaLink'], provider=self.bucket_url.scheme)
    910         else:
    911           obj_metadata = self.Upload(self.thru_file_names[i],
    912                                      self.thru_object_names[i], use_file)
    913 
    914         # File overwrite causes performance issues with sliced downloads.
    915         # Delete the file and reopen it for download. This matches what a real
    916         # download would look like.
    917         os.unlink(self.thru_file_names[i])
    918         open(self.thru_file_names[i], 'ab').close()
    919         serialization_data.append(GetDownloadSerializationData(obj_metadata))
    920     else:
    921       # For in-memory test only use one file but copy it num_objects times, to
    922       # allow scalability in num_objects.
    923       self.temporary_objects.add(self.mem_thru_object_name)
    924       obj_metadata = self.Upload(self.mem_thru_file_name,
    925                                  self.mem_thru_object_name, use_file)
    926       file_names = None
    927       object_names = [self.mem_thru_object_name] * self.num_objects
    928       serialization_data = (
    929           [GetDownloadSerializationData(obj_metadata)] * self.num_objects)
    930 
    931     # Warmup the TCP connection.
    932     warmup_obj_name = os.path.basename(self.tcp_warmup_file)
    933     self.temporary_objects.add(warmup_obj_name)
    934     self.Upload(self.tcp_warmup_file, warmup_obj_name)
    935     self.Download(warmup_obj_name)
    936 
    937     t0 = time.time()
    938     if self.processes == 1 and self.threads == 1:
    939       for i in range(self.num_objects):
    940         file_name = file_names[i] if use_file else None
    941         self.Download(object_names[i], file_name, serialization_data[i])
    942     else:
    943       if self.parallel_strategy in (self.FAN, self.BOTH):
    944         need_to_slice = (self.parallel_strategy == self.BOTH)
    945         self.PerformFannedDownload(need_to_slice, object_names, file_names,
    946                                    serialization_data)
    947       elif self.parallel_strategy == self.SLICE:
    948         for i in range(self.num_objects):
    949           file_name = file_names[i] if use_file else None
    950           self.PerformSlicedDownload(
    951               object_names[i], file_name, serialization_data[i])
    952     t1 = time.time()
    953 
    954     time_took = t1 - t0
    955     total_bytes_copied = self.thru_filesize * self.num_objects
    956     bytes_per_second = total_bytes_copied / time_took
    957 
    958     self.results[test_name]['time_took'] = time_took
    959     self.results[test_name]['total_bytes_copied'] = total_bytes_copied
    960     self.results[test_name]['bytes_per_second'] = bytes_per_second
    961 
    962   def _RunWriteThruTests(self, use_file=False):
    963     """Runs write throughput tests."""
    964     test_name = 'write_throughput_file' if use_file else 'write_throughput'
    965     file_io_string = 'with file I/O' if use_file else ''
    966     self.logger.info(
    967         '\nRunning write throughput tests %s (%s objects of size %s)' %
    968         (file_io_string, self.num_objects,
    969          MakeHumanReadable(self.thru_filesize)))
    970     self._WarnIfLargeData()
    971 
    972     self.results[test_name] = {'file_size': self.thru_filesize,
    973                                'processes': self.processes,
    974                                'threads': self.threads,
    975                                'parallelism': self.parallel_strategy}
    976 
    977     # Warmup the TCP connection.
    978     warmup_obj_name = os.path.basename(self.tcp_warmup_file)
    979     self.temporary_objects.add(warmup_obj_name)
    980     self.Upload(self.tcp_warmup_file, warmup_obj_name)
    981 
    982     if use_file:
    983       # For test with file I/O use N files on disk to preserve seek performance.
    984       file_names = self.thru_file_names
    985       object_names = self.thru_object_names
    986     else:
    987       # For in-memory test only use one file but copy it num_objects times, to
    988       # allow for scalability in num_objects.
    989       file_names = [self.mem_thru_file_name] * self.num_objects
    990       object_names = (
    991           [self.mem_thru_object_name + str(i) for i in range(self.num_objects)])
    992 
    993     for object_name in object_names:
    994       self.temporary_objects.add(object_name)
    995 
    996     t0 = time.time()
    997     if self.processes == 1 and self.threads == 1:
    998       for i in range(self.num_objects):
    999         self.Upload(file_names[i], object_names[i], use_file)
   1000     else:
   1001       if self.parallel_strategy in (self.FAN, self.BOTH):
   1002         need_to_slice = (self.parallel_strategy == self.BOTH)
   1003         self.PerformFannedUpload(need_to_slice, file_names, object_names,
   1004                                  use_file)
   1005       elif self.parallel_strategy == self.SLICE:
   1006         for i in range(self.num_objects):
   1007           self.PerformSlicedUpload(file_names[i], object_names[i], use_file)
   1008     t1 = time.time()
   1009 
   1010     time_took = t1 - t0
   1011     total_bytes_copied = self.thru_filesize * self.num_objects
   1012     bytes_per_second = total_bytes_copied / time_took
   1013 
   1014     self.results[test_name]['time_took'] = time_took
   1015     self.results[test_name]['total_bytes_copied'] = total_bytes_copied
   1016     self.results[test_name]['bytes_per_second'] = bytes_per_second
   1017 
   1018   def _RunListTests(self):
   1019     """Runs eventual consistency listing latency tests."""
   1020     self.results['listing'] = {'num_files': self.num_objects}
   1021 
   1022     # Generate N random objects to put into the bucket.
   1023     list_prefix = 'gsutil-perfdiag-list-'
   1024     list_fpaths = []
   1025     list_objects = []
   1026     args = []
   1027     for _ in xrange(self.num_objects):
   1028       fpath = self._MakeTempFile(0, mem_data=True, mem_metadata=True,
   1029                                  prefix=list_prefix)
   1030       list_fpaths.append(fpath)
   1031       object_name = os.path.basename(fpath)
   1032       list_objects.append(object_name)
   1033       args.append(FanUploadTuple(False, fpath, object_name, False))
   1034       self.temporary_objects.add(object_name)
   1035 
   1036     # Add the objects to the bucket.
   1037     self.logger.info(
   1038         '\nWriting %s objects for listing test...', self.num_objects)
   1039 
   1040     self.Apply(_UploadObject, args, _PerfdiagExceptionHandler,
   1041                arg_checker=DummyArgChecker)
   1042 
   1043     list_latencies = []
   1044     files_seen = []
   1045     total_start_time = time.time()
   1046     expected_objects = set(list_objects)
   1047     found_objects = set()
   1048 
   1049     def _List():
   1050       """Lists and returns objects in the bucket. Also records latency."""
   1051       t0 = time.time()
   1052       objects = list(self.gsutil_api.ListObjects(
   1053           self.bucket_url.bucket_name, delimiter='/',
   1054           provider=self.provider, fields=['items/name']))
   1055       t1 = time.time()
   1056       list_latencies.append(t1 - t0)
   1057       return set([obj.data.name for obj in objects])
   1058 
   1059     self.logger.info(
   1060         'Listing bucket %s waiting for %s objects to appear...',
   1061         self.bucket_url.bucket_name, self.num_objects)
   1062     while expected_objects - found_objects:
   1063       def _ListAfterUpload():
   1064         names = _List()
   1065         found_objects.update(names & expected_objects)
   1066         files_seen.append(len(found_objects))
   1067       self._RunOperation(_ListAfterUpload)
   1068       if expected_objects - found_objects:
   1069         if time.time() - total_start_time > self.MAX_LISTING_WAIT_TIME:
   1070           self.logger.warning('Maximum time reached waiting for listing.')
   1071           break
   1072     total_end_time = time.time()
   1073 
   1074     self.results['listing']['insert'] = {
   1075         'num_listing_calls': len(list_latencies),
   1076         'list_latencies': list_latencies,
   1077         'files_seen_after_listing': files_seen,
   1078         'time_took': total_end_time - total_start_time,
   1079     }
   1080 
   1081     args = [object_name for object_name in list_objects]
   1082     self.logger.info(
   1083         'Deleting %s objects for listing test...', self.num_objects)
   1084     self.Apply(_DeleteWrapper, args, _PerfdiagExceptionHandler,
   1085                arg_checker=DummyArgChecker)
   1086 
   1087     self.logger.info(
   1088         'Listing bucket %s waiting for %s objects to disappear...',
   1089         self.bucket_url.bucket_name, self.num_objects)
   1090     list_latencies = []
   1091     files_seen = []
   1092     total_start_time = time.time()
   1093     found_objects = set(list_objects)
   1094     while found_objects:
   1095       def _ListAfterDelete():
   1096         names = _List()
   1097         found_objects.intersection_update(names)
   1098         files_seen.append(len(found_objects))
   1099       self._RunOperation(_ListAfterDelete)
   1100       if found_objects:
   1101         if time.time() - total_start_time > self.MAX_LISTING_WAIT_TIME:
   1102           self.logger.warning('Maximum time reached waiting for listing.')
   1103           break
   1104     total_end_time = time.time()
   1105 
   1106     self.results['listing']['delete'] = {
   1107         'num_listing_calls': len(list_latencies),
   1108         'list_latencies': list_latencies,
   1109         'files_seen_after_listing': files_seen,
   1110         'time_took': total_end_time - total_start_time,
   1111     }
   1112 
   1113   def Upload(self, file_name, object_name, use_file=False, file_start=0,
   1114              file_size=None):
   1115     """Performs an upload to the test bucket.
   1116 
   1117     The file is uploaded to the bucket referred to by self.bucket_url, and has
   1118     name object_name.
   1119 
   1120     Args:
   1121       file_name: The path to the local file, and the key to its entry in
   1122                  temp_file_dict.
   1123       object_name: The name of the remote object.
   1124       use_file: If true, use disk I/O, otherwise read everything from memory.
   1125       file_start: The first byte in the file to upload to the object.
   1126                   (only should be specified for sliced uploads)
   1127       file_size: The size of the file to upload.
   1128                  (only should be specified for sliced uploads)
   1129 
   1130     Returns:
   1131       Uploaded Object Metadata.
   1132     """
   1133     fp = None
   1134     if file_size is None:
   1135       file_size = temp_file_dict[file_name].size
   1136 
   1137     upload_url = self.bucket_url.Clone()
   1138     upload_url.object_name = object_name
   1139     upload_target = StorageUrlToUploadObjectMetadata(upload_url)
   1140 
   1141     try:
   1142       if use_file:
   1143         fp = FilePart(file_name, file_start, file_size)
   1144       else:
   1145         data = temp_file_dict[file_name].data[file_start:file_start+file_size]
   1146         fp = cStringIO.StringIO(data)
   1147 
   1148       def _InnerUpload():
   1149         if file_size < ResumableThreshold():
   1150           return self.gsutil_api.UploadObject(
   1151               fp, upload_target, provider=self.provider, size=file_size,
   1152               fields=['name', 'mediaLink', 'size'])
   1153         else:
   1154           return self.gsutil_api.UploadObjectResumable(
   1155               fp, upload_target, provider=self.provider, size=file_size,
   1156               fields=['name', 'mediaLink', 'size'],
   1157               tracker_callback=_DummyTrackerCallback)
   1158       return self._RunOperation(_InnerUpload)
   1159     finally:
   1160       if fp:
   1161         fp.close()
   1162 
   1163   def Download(self, object_name, file_name=None, serialization_data=None,
   1164                start_byte=0, end_byte=None):
   1165     """Downloads an object from the test bucket.
   1166 
   1167     Args:
   1168       object_name: The name of the object (in the test bucket) to download.
   1169       file_name: Optional file name to write downloaded data to. If None,
   1170                  downloaded data is discarded immediately.
   1171       serialization_data: Optional serialization data, used so that we don't
   1172                           have to get the metadata before downloading.
   1173       start_byte: The first byte in the object to download.
   1174                   (only should be specified for sliced downloads)
   1175       end_byte: The last byte in the object to download.
   1176                 (only should be specified for sliced downloads)
   1177     """
   1178     fp = None
   1179     try:
   1180       if file_name is not None:
   1181         fp = open(file_name, 'r+b')
   1182         fp.seek(start_byte)
   1183       else:
   1184         fp = self.discard_sink
   1185 
   1186       def _InnerDownload():
   1187         self.gsutil_api.GetObjectMedia(
   1188             self.bucket_url.bucket_name, object_name, fp,
   1189             provider=self.provider, start_byte=start_byte, end_byte=end_byte,
   1190             serialization_data=serialization_data)
   1191       self._RunOperation(_InnerDownload)
   1192     finally:
   1193       if fp:
   1194         fp.close()
   1195 
   1196   def Delete(self, object_name):
   1197     """Deletes an object from the test bucket.
   1198 
   1199     Args:
   1200       object_name: The name of the object to delete.
   1201     """
   1202     try:
   1203       def _InnerDelete():
   1204         self.gsutil_api.DeleteObject(self.bucket_url.bucket_name, object_name,
   1205                                      provider=self.provider)
   1206       self._RunOperation(_InnerDelete)
   1207     except NotFoundException:
   1208       pass
   1209 
   1210   def _GetDiskCounters(self):
   1211     """Retrieves disk I/O statistics for all disks.
   1212 
   1213     Adapted from the psutil module's psutil._pslinux.disk_io_counters:
   1214       http://code.google.com/p/psutil/source/browse/trunk/psutil/_pslinux.py
   1215 
   1216     Originally distributed under under a BSD license.
   1217     Original Copyright (c) 2009, Jay Loden, Dave Daeschler, Giampaolo Rodola.
   1218 
   1219     Returns:
   1220       A dictionary containing disk names mapped to the disk counters from
   1221       /disk/diskstats.
   1222     """
   1223     # iostat documentation states that sectors are equivalent with blocks and
   1224     # have a size of 512 bytes since 2.4 kernels. This value is needed to
   1225     # calculate the amount of disk I/O in bytes.
   1226     sector_size = 512
   1227 
   1228     partitions = []
   1229     with open('/proc/partitions', 'r') as f:
   1230       lines = f.readlines()[2:]
   1231       for line in lines:
   1232         _, _, _, name = line.split()
   1233         if name[-1].isdigit():
   1234           partitions.append(name)
   1235 
   1236     retdict = {}
   1237     with open('/proc/diskstats', 'r') as f:
   1238       for line in f:
   1239         values = line.split()[:11]
   1240         _, _, name, reads, _, rbytes, rtime, writes, _, wbytes, wtime = values
   1241         if name in partitions:
   1242           rbytes = int(rbytes) * sector_size
   1243           wbytes = int(wbytes) * sector_size
   1244           reads = int(reads)
   1245           writes = int(writes)
   1246           rtime = int(rtime)
   1247           wtime = int(wtime)
   1248           retdict[name] = (reads, writes, rbytes, wbytes, rtime, wtime)
   1249     return retdict
   1250 
   1251   def _GetTcpStats(self):
   1252     """Tries to parse out TCP packet information from netstat output.
   1253 
   1254     Returns:
   1255        A dictionary containing TCP information, or None if netstat is not
   1256        available.
   1257     """
   1258     # netstat return code is non-zero for -s on Linux, so don't raise on error.
   1259     try:
   1260       netstat_output = self._Exec(['netstat', '-s'], return_output=True,
   1261                                   raise_on_error=False)
   1262     except OSError:
   1263       self.logger.warning('netstat not found on your system; some measurement '
   1264                           'data will be missing')
   1265       return None
   1266     netstat_output = netstat_output.strip().lower()
   1267     found_tcp = False
   1268     tcp_retransmit = None
   1269     tcp_received = None
   1270     tcp_sent = None
   1271     for line in netstat_output.split('\n'):
   1272       # Header for TCP section is "Tcp:" in Linux/Mac and
   1273       # "TCP Statistics for" in Windows.
   1274       if 'tcp:' in line or 'tcp statistics' in line:
   1275         found_tcp = True
   1276 
   1277       # Linux == "segments retransmited" (sic), Mac == "retransmit timeouts"
   1278       # Windows == "segments retransmitted".
   1279       if (found_tcp and tcp_retransmit is None and
   1280           ('segments retransmited' in line or 'retransmit timeouts' in line or
   1281            'segments retransmitted' in line)):
   1282         tcp_retransmit = ''.join(c for c in line if c in string.digits)
   1283 
   1284       # Linux+Windows == "segments received", Mac == "packets received".
   1285       if (found_tcp and tcp_received is None and
   1286           ('segments received' in line or 'packets received' in line)):
   1287         tcp_received = ''.join(c for c in line if c in string.digits)
   1288 
   1289       # Linux == "segments send out" (sic), Mac+Windows == "packets sent".
   1290       if (found_tcp and tcp_sent is None and
   1291           ('segments send out' in line or 'packets sent' in line or
   1292            'segments sent' in line)):
   1293         tcp_sent = ''.join(c for c in line if c in string.digits)
   1294 
   1295     result = {}
   1296     try:
   1297       result['tcp_retransmit'] = int(tcp_retransmit)
   1298       result['tcp_received'] = int(tcp_received)
   1299       result['tcp_sent'] = int(tcp_sent)
   1300     except (ValueError, TypeError):
   1301       result['tcp_retransmit'] = None
   1302       result['tcp_received'] = None
   1303       result['tcp_sent'] = None
   1304 
   1305     return result
   1306 
   1307   def _CollectSysInfo(self):
   1308     """Collects system information."""
   1309     sysinfo = {}
   1310 
   1311     # All exceptions that might be raised from socket module calls.
   1312     socket_errors = (
   1313         socket.error, socket.herror, socket.gaierror, socket.timeout)
   1314 
   1315     # Find out whether HTTPS is enabled in Boto.
   1316     sysinfo['boto_https_enabled'] = boto.config.get('Boto', 'is_secure', True)
   1317 
   1318     # Look up proxy info.
   1319     proxy_host = boto.config.get('Boto', 'proxy', None)
   1320     proxy_port = boto.config.getint('Boto', 'proxy_port', 0)
   1321     sysinfo['using_proxy'] = bool(proxy_host)
   1322 
   1323     if boto.config.get('Boto', 'proxy_rdns', False):
   1324       self.logger.info('DNS lookups are disallowed in this environment, so '
   1325                        'some information is not included in this perfdiag run.')
   1326 
   1327     # Get the local IP address from socket lib.
   1328     try:
   1329       sysinfo['ip_address'] = socket.gethostbyname(socket.gethostname())
   1330     except socket_errors:
   1331       sysinfo['ip_address'] = ''
   1332     # Record the temporary directory used since it can affect performance, e.g.
   1333     # when on a networked filesystem.
   1334     sysinfo['tempdir'] = self.directory
   1335 
   1336     # Produces an RFC 2822 compliant GMT timestamp.
   1337     sysinfo['gmt_timestamp'] = time.strftime('%a, %d %b %Y %H:%M:%S +0000',
   1338                                              time.gmtime())
   1339 
   1340     # Execute a CNAME lookup on Google DNS to find what Google server
   1341     # it's routing to.
   1342     cmd = ['nslookup', '-type=CNAME', self.XML_API_HOST]
   1343     try:
   1344       nslookup_cname_output = self._Exec(cmd, return_output=True)
   1345       m = re.search(r' = (?P<googserv>[^.]+)\.', nslookup_cname_output)
   1346       sysinfo['googserv_route'] = m.group('googserv') if m else None
   1347     except (CommandException, OSError):
   1348       sysinfo['googserv_route'] = ''
   1349 
   1350     # Try to determine the latency of a DNS lookup for the Google hostname
   1351     # endpoint. Note: we don't piggyback on gethostbyname_ex below because
   1352     # the _ex version requires an extra RTT.
   1353     try:
   1354       t0 = time.time()
   1355       socket.gethostbyname(self.XML_API_HOST)
   1356       t1 = time.time()
   1357       sysinfo['google_host_dns_latency'] = t1 - t0
   1358     except socket_errors:
   1359       pass
   1360 
   1361     # Look up IP addresses for Google Server.
   1362     try:
   1363       (hostname, _, ipaddrlist) = socket.gethostbyname_ex(self.XML_API_HOST)
   1364       sysinfo['googserv_ips'] = ipaddrlist
   1365     except socket_errors:
   1366       ipaddrlist = []
   1367       sysinfo['googserv_ips'] = []
   1368 
   1369     # Reverse lookup the hostnames for the Google Server IPs.
   1370     sysinfo['googserv_hostnames'] = []
   1371     for googserv_ip in ipaddrlist:
   1372       try:
   1373         (hostname, _, ipaddrlist) = socket.gethostbyaddr(googserv_ip)
   1374         sysinfo['googserv_hostnames'].append(hostname)
   1375       except socket_errors:
   1376         pass
   1377 
   1378     # Query o-o to find out what the Google DNS thinks is the user's IP.
   1379     try:
   1380       cmd = ['nslookup', '-type=TXT', 'o-o.myaddr.google.com.']
   1381       nslookup_txt_output = self._Exec(cmd, return_output=True)
   1382       m = re.search(r'text\s+=\s+"(?P<dnsip>[\.\d]+)"', nslookup_txt_output)
   1383       sysinfo['dns_o-o_ip'] = m.group('dnsip') if m else None
   1384     except (CommandException, OSError):
   1385       sysinfo['dns_o-o_ip'] = ''
   1386 
   1387     # Try to determine the latency of connecting to the Google hostname
   1388     # endpoint.
   1389     sysinfo['google_host_connect_latencies'] = {}
   1390     for googserv_ip in ipaddrlist:
   1391       try:
   1392         sock = socket.socket()
   1393         t0 = time.time()
   1394         sock.connect((googserv_ip, self.XML_API_PORT))
   1395         t1 = time.time()
   1396         sysinfo['google_host_connect_latencies'][googserv_ip] = t1 - t0
   1397       except socket_errors:
   1398         pass
   1399 
   1400     # If using a proxy, try to determine the latency of a DNS lookup to resolve
   1401     # the proxy hostname and the latency of connecting to the proxy.
   1402     if proxy_host:
   1403       proxy_ip = None
   1404       try:
   1405         t0 = time.time()
   1406         proxy_ip = socket.gethostbyname(proxy_host)
   1407         t1 = time.time()
   1408         sysinfo['proxy_dns_latency'] = t1 - t0
   1409       except socket_errors:
   1410         pass
   1411 
   1412       try:
   1413         sock = socket.socket()
   1414         t0 = time.time()
   1415         sock.connect((proxy_ip or proxy_host, proxy_port))
   1416         t1 = time.time()
   1417         sysinfo['proxy_host_connect_latency'] = t1 - t0
   1418       except socket_errors:
   1419         pass
   1420 
   1421     # Try and find the number of CPUs in the system if available.
   1422     try:
   1423       sysinfo['cpu_count'] = multiprocessing.cpu_count()
   1424     except NotImplementedError:
   1425       sysinfo['cpu_count'] = None
   1426 
   1427     # For *nix platforms, obtain the CPU load.
   1428     try:
   1429       sysinfo['load_avg'] = list(os.getloadavg())
   1430     except (AttributeError, OSError):
   1431       sysinfo['load_avg'] = None
   1432 
   1433     # Try and collect memory information from /proc/meminfo if possible.
   1434     mem_total = None
   1435     mem_free = None
   1436     mem_buffers = None
   1437     mem_cached = None
   1438 
   1439     try:
   1440       with open('/proc/meminfo', 'r') as f:
   1441         for line in f:
   1442           if line.startswith('MemTotal'):
   1443             mem_total = (int(''.join(c for c in line if c in string.digits))
   1444                          * 1000)
   1445           elif line.startswith('MemFree'):
   1446             mem_free = (int(''.join(c for c in line if c in string.digits))
   1447                         * 1000)
   1448           elif line.startswith('Buffers'):
   1449             mem_buffers = (int(''.join(c for c in line if c in string.digits))
   1450                            * 1000)
   1451           elif line.startswith('Cached'):
   1452             mem_cached = (int(''.join(c for c in line if c in string.digits))
   1453                           * 1000)
   1454     except (IOError, ValueError):
   1455       pass
   1456 
   1457     sysinfo['meminfo'] = {'mem_total': mem_total,
   1458                           'mem_free': mem_free,
   1459                           'mem_buffers': mem_buffers,
   1460                           'mem_cached': mem_cached}
   1461 
   1462     # Get configuration attributes from config module.
   1463     sysinfo['gsutil_config'] = {}
   1464     for attr in dir(config):
   1465       attr_value = getattr(config, attr)
   1466       # Filter out multiline strings that are not useful.
   1467       if attr.isupper() and not (isinstance(attr_value, basestring) and
   1468                                  '\n' in attr_value):
   1469         sysinfo['gsutil_config'][attr] = attr_value
   1470 
   1471     sysinfo['tcp_proc_values'] = {}
   1472     stats_to_check = [
   1473         '/proc/sys/net/core/rmem_default',
   1474         '/proc/sys/net/core/rmem_max',
   1475         '/proc/sys/net/core/wmem_default',
   1476         '/proc/sys/net/core/wmem_max',
   1477         '/proc/sys/net/ipv4/tcp_timestamps',
   1478         '/proc/sys/net/ipv4/tcp_sack',
   1479         '/proc/sys/net/ipv4/tcp_window_scaling',
   1480     ]
   1481     for fname in stats_to_check:
   1482       try:
   1483         with open(fname, 'r') as f:
   1484           value = f.read()
   1485         sysinfo['tcp_proc_values'][os.path.basename(fname)] = value.strip()
   1486       except IOError:
   1487         pass
   1488 
   1489     self.results['sysinfo'] = sysinfo
   1490 
   1491   def _DisplayStats(self, trials):
   1492     """Prints out mean, standard deviation, median, and 90th percentile."""
   1493     n = len(trials)
   1494     mean = float(sum(trials)) / n
   1495     stdev = math.sqrt(sum((x - mean)**2 for x in trials) / n)
   1496 
   1497     print str(n).rjust(6), '',
   1498     print ('%.1f' % (mean * 1000)).rjust(9), '',
   1499     print ('%.1f' % (stdev * 1000)).rjust(12), '',
   1500     print ('%.1f' % (Percentile(trials, 0.5) * 1000)).rjust(11), '',
   1501     print ('%.1f' % (Percentile(trials, 0.9) * 1000)).rjust(11), ''
   1502 
   1503   def _DisplayResults(self):
   1504     """Displays results collected from diagnostic run."""
   1505     print
   1506     print '=' * 78
   1507     print 'DIAGNOSTIC RESULTS'.center(78)
   1508     print '=' * 78
   1509 
   1510     if 'latency' in self.results:
   1511       print
   1512       print '-' * 78
   1513       print 'Latency'.center(78)
   1514       print '-' * 78
   1515       print ('Operation       Size  Trials  Mean (ms)  Std Dev (ms)  '
   1516              'Median (ms)  90th % (ms)')
   1517       print ('=========  =========  ======  =========  ============  '
   1518              '===========  ===========')
   1519       for key in sorted(self.results['latency']):
   1520         trials = sorted(self.results['latency'][key])
   1521         op, numbytes = key.split('_')
   1522         numbytes = int(numbytes)
   1523         if op == 'METADATA':
   1524           print 'Metadata'.rjust(9), '',
   1525           print MakeHumanReadable(numbytes).rjust(9), '',
   1526           self._DisplayStats(trials)
   1527         if op == 'DOWNLOAD':
   1528           print 'Download'.rjust(9), '',
   1529           print MakeHumanReadable(numbytes).rjust(9), '',
   1530           self._DisplayStats(trials)
   1531         if op == 'UPLOAD':
   1532           print 'Upload'.rjust(9), '',
   1533           print MakeHumanReadable(numbytes).rjust(9), '',
   1534           self._DisplayStats(trials)
   1535         if op == 'DELETE':
   1536           print 'Delete'.rjust(9), '',
   1537           print MakeHumanReadable(numbytes).rjust(9), '',
   1538           self._DisplayStats(trials)
   1539 
   1540     if 'write_throughput' in self.results:
   1541       print
   1542       print '-' * 78
   1543       print 'Write Throughput'.center(78)
   1544       print '-' * 78
   1545       write_thru = self.results['write_throughput']
   1546       print 'Copied %s %s file(s) for a total transfer size of %s.' % (
   1547           self.num_objects,
   1548           MakeHumanReadable(write_thru['file_size']),
   1549           MakeHumanReadable(write_thru['total_bytes_copied']))
   1550       print 'Write throughput: %s/s.' % (
   1551           MakeBitsHumanReadable(write_thru['bytes_per_second'] * 8))
   1552       print 'Parallelism strategy: %s' % write_thru['parallelism']
   1553 
   1554     if 'write_throughput_file' in self.results:
   1555       print
   1556       print '-' * 78
   1557       print 'Write Throughput With File I/O'.center(78)
   1558       print '-' * 78
   1559       write_thru_file = self.results['write_throughput_file']
   1560       print 'Copied %s %s file(s) for a total transfer size of %s.' % (
   1561           self.num_objects,
   1562           MakeHumanReadable(write_thru_file['file_size']),
   1563           MakeHumanReadable(write_thru_file['total_bytes_copied']))
   1564       print 'Write throughput: %s/s.' % (
   1565           MakeBitsHumanReadable(write_thru_file['bytes_per_second'] * 8))
   1566       print 'Parallelism strategy: %s' % write_thru_file['parallelism']
   1567 
   1568     if 'read_throughput' in self.results:
   1569       print
   1570       print '-' * 78
   1571       print 'Read Throughput'.center(78)
   1572       print '-' * 78
   1573       read_thru = self.results['read_throughput']
   1574       print 'Copied %s %s file(s) for a total transfer size of %s.' % (
   1575           self.num_objects,
   1576           MakeHumanReadable(read_thru['file_size']),
   1577           MakeHumanReadable(read_thru['total_bytes_copied']))
   1578       print 'Read throughput: %s/s.' % (
   1579           MakeBitsHumanReadable(read_thru['bytes_per_second'] * 8))
   1580       print 'Parallelism strategy: %s' % read_thru['parallelism']
   1581 
   1582     if 'read_throughput_file' in self.results:
   1583       print
   1584       print '-' * 78
   1585       print 'Read Throughput With File I/O'.center(78)
   1586       print '-' * 78
   1587       read_thru_file = self.results['read_throughput_file']
   1588       print 'Copied %s %s file(s) for a total transfer size of %s.' % (
   1589           self.num_objects,
   1590           MakeHumanReadable(read_thru_file['file_size']),
   1591           MakeHumanReadable(read_thru_file['total_bytes_copied']))
   1592       print 'Read throughput: %s/s.' % (
   1593           MakeBitsHumanReadable(read_thru_file['bytes_per_second'] * 8))
   1594       print 'Parallelism strategy: %s' % read_thru_file['parallelism']
   1595 
   1596     if 'listing' in self.results:
   1597       print
   1598       print '-' * 78
   1599       print 'Listing'.center(78)
   1600       print '-' * 78
   1601 
   1602       listing = self.results['listing']
   1603       insert = listing['insert']
   1604       delete = listing['delete']
   1605       print 'After inserting %s objects:' % listing['num_files']
   1606       print ('  Total time for objects to appear: %.2g seconds' %
   1607              insert['time_took'])
   1608       print '  Number of listing calls made: %s' % insert['num_listing_calls']
   1609       print ('  Individual listing call latencies: [%s]' %
   1610              ', '.join('%.2gs' % lat for lat in insert['list_latencies']))
   1611       print ('  Files reflected after each call: [%s]' %
   1612              ', '.join(map(str, insert['files_seen_after_listing'])))
   1613 
   1614       print 'After deleting %s objects:' % listing['num_files']
   1615       print ('  Total time for objects to appear: %.2g seconds' %
   1616              delete['time_took'])
   1617       print '  Number of listing calls made: %s' % delete['num_listing_calls']
   1618       print ('  Individual listing call latencies: [%s]' %
   1619              ', '.join('%.2gs' % lat for lat in delete['list_latencies']))
   1620       print ('  Files reflected after each call: [%s]' %
   1621              ', '.join(map(str, delete['files_seen_after_listing'])))
   1622 
   1623     if 'sysinfo' in self.results:
   1624       print
   1625       print '-' * 78
   1626       print 'System Information'.center(78)
   1627       print '-' * 78
   1628       info = self.results['sysinfo']
   1629       print 'IP Address: \n  %s' % info['ip_address']
   1630       print 'Temporary Directory: \n  %s' % info['tempdir']
   1631       print 'Bucket URI: \n  %s' % self.results['bucket_uri']
   1632       print 'gsutil Version: \n  %s' % self.results.get('gsutil_version',
   1633                                                         'Unknown')
   1634       print 'boto Version: \n  %s' % self.results.get('boto_version', 'Unknown')
   1635 
   1636       if 'gmt_timestamp' in info:
   1637         ts_string = info['gmt_timestamp']
   1638         timetuple = None
   1639         try:
   1640           # Convert RFC 2822 string to Linux timestamp.
   1641           timetuple = time.strptime(ts_string, '%a, %d %b %Y %H:%M:%S +0000')
   1642         except ValueError:
   1643           pass
   1644 
   1645         if timetuple:
   1646           # Converts the GMT time tuple to local Linux timestamp.
   1647           localtime = calendar.timegm(timetuple)
   1648           localdt = datetime.datetime.fromtimestamp(localtime)
   1649           print 'Measurement time: \n %s' % localdt.strftime(
   1650               '%Y-%m-%d %I:%M:%S %p %Z')
   1651 
   1652       print 'Google Server: \n  %s' % info['googserv_route']
   1653       print ('Google Server IP Addresses: \n  %s' %
   1654              ('\n  '.join(info['googserv_ips'])))
   1655       print ('Google Server Hostnames: \n  %s' %
   1656              ('\n  '.join(info['googserv_hostnames'])))
   1657       print 'Google DNS thinks your IP is: \n  %s' % info['dns_o-o_ip']
   1658       print 'CPU Count: \n  %s' % info['cpu_count']
   1659       print 'CPU Load Average: \n  %s' % info['load_avg']
   1660       try:
   1661         print ('Total Memory: \n  %s' %
   1662                MakeHumanReadable(info['meminfo']['mem_total']))
   1663         # Free memory is really MemFree + Buffers + Cached.
   1664         print 'Free Memory: \n  %s' % MakeHumanReadable(
   1665             info['meminfo']['mem_free'] +
   1666             info['meminfo']['mem_buffers'] +
   1667             info['meminfo']['mem_cached'])
   1668       except TypeError:
   1669         pass
   1670 
   1671       if 'netstat_end' in info and 'netstat_start' in info:
   1672         netstat_after = info['netstat_end']
   1673         netstat_before = info['netstat_start']
   1674         for tcp_type in ('sent', 'received', 'retransmit'):
   1675           try:
   1676             delta = (netstat_after['tcp_%s' % tcp_type] -
   1677                      netstat_before['tcp_%s' % tcp_type])
   1678             print 'TCP segments %s during test:\n  %d' % (tcp_type, delta)
   1679           except TypeError:
   1680             pass
   1681       else:
   1682         print ('TCP segment counts not available because "netstat" was not '
   1683                'found during test runs')
   1684 
   1685       if 'disk_counters_end' in info and 'disk_counters_start' in info:
   1686         print 'Disk Counter Deltas:\n',
   1687         disk_after = info['disk_counters_end']
   1688         disk_before = info['disk_counters_start']
   1689         print '', 'disk'.rjust(6),
   1690         for colname in ['reads', 'writes', 'rbytes', 'wbytes', 'rtime',
   1691                         'wtime']:
   1692           print colname.rjust(8),
   1693         print
   1694         for diskname in sorted(disk_after):
   1695           before = disk_before[diskname]
   1696           after = disk_after[diskname]
   1697           (reads1, writes1, rbytes1, wbytes1, rtime1, wtime1) = before
   1698           (reads2, writes2, rbytes2, wbytes2, rtime2, wtime2) = after
   1699           print '', diskname.rjust(6),
   1700           deltas = [reads2-reads1, writes2-writes1, rbytes2-rbytes1,
   1701                     wbytes2-wbytes1, rtime2-rtime1, wtime2-wtime1]
   1702           for delta in deltas:
   1703             print str(delta).rjust(8),
   1704           print
   1705 
   1706       if 'tcp_proc_values' in info:
   1707         print 'TCP /proc values:\n',
   1708         for item in info['tcp_proc_values'].iteritems():
   1709           print '   %s = %s' % item
   1710 
   1711       if 'boto_https_enabled' in info:
   1712         print 'Boto HTTPS Enabled: \n  %s' % info['boto_https_enabled']
   1713 
   1714       if 'using_proxy' in info:
   1715         print 'Requests routed through proxy: \n  %s' % info['using_proxy']
   1716 
   1717       if 'google_host_dns_latency' in info:
   1718         print ('Latency of the DNS lookup for Google Storage server (ms): '
   1719                '\n  %.1f' % (info['google_host_dns_latency'] * 1000.0))
   1720 
   1721       if 'google_host_connect_latencies' in info:
   1722         print 'Latencies connecting to Google Storage server IPs (ms):'
   1723         for ip, latency in info['google_host_connect_latencies'].iteritems():
   1724           print '  %s = %.1f' % (ip, latency * 1000.0)
   1725 
   1726       if 'proxy_dns_latency' in info:
   1727         print ('Latency of the DNS lookup for the configured proxy (ms): '
   1728                '\n  %.1f' % (info['proxy_dns_latency'] * 1000.0))
   1729 
   1730       if 'proxy_host_connect_latency' in info:
   1731         print ('Latency connecting to the configured proxy (ms): \n  %.1f' %
   1732                (info['proxy_host_connect_latency'] * 1000.0))
   1733 
   1734     if 'request_errors' in self.results and 'total_requests' in self.results:
   1735       print
   1736       print '-' * 78
   1737       print 'In-Process HTTP Statistics'.center(78)
   1738       print '-' * 78
   1739       total = int(self.results['total_requests'])
   1740       numerrors = int(self.results['request_errors'])
   1741       numbreaks = int(self.results['connection_breaks'])
   1742       availability = (((total - numerrors) / float(total)) * 100
   1743                       if total > 0 else 100)
   1744       print 'Total HTTP requests made: %d' % total
   1745       print 'HTTP 5xx errors: %d' % numerrors
   1746       print 'HTTP connections broken: %d' % numbreaks
   1747       print 'Availability: %.7g%%' % availability
   1748       if 'error_responses_by_code' in self.results:
   1749         sorted_codes = sorted(
   1750             self.results['error_responses_by_code'].iteritems())
   1751         if sorted_codes:
   1752           print 'Error responses by code:'
   1753           print '\n'.join('  %s: %s' % c for c in sorted_codes)
   1754 
   1755     if self.output_file:
   1756       with open(self.output_file, 'w') as f:
   1757         json.dump(self.results, f, indent=2)
   1758       print
   1759       print "Output file written to '%s'." % self.output_file
   1760 
   1761     print
   1762 
   1763   def _ParsePositiveInteger(self, val, msg):
   1764     """Tries to convert val argument to a positive integer.
   1765 
   1766     Args:
   1767       val: The value (as a string) to convert to a positive integer.
   1768       msg: The error message to place in the CommandException on an error.
   1769 
   1770     Returns:
   1771       A valid positive integer.
   1772 
   1773     Raises:
   1774       CommandException: If the supplied value is not a valid positive integer.
   1775     """
   1776     try:
   1777       val = int(val)
   1778       if val < 1:
   1779         raise CommandException(msg)
   1780       return val
   1781     except ValueError:
   1782       raise CommandException(msg)
   1783 
   1784   def _ParseArgs(self):
   1785     """Parses arguments for perfdiag command."""
   1786     # From -n.
   1787     self.num_objects = 5
   1788     # From -c.
   1789     self.processes = 1
   1790     # From -k.
   1791     self.threads = 1
   1792     # From -p
   1793     self.parallel_strategy = None
   1794     # From -y
   1795     self.num_slices = 4
   1796     # From -s.
   1797     self.thru_filesize = 1048576
   1798     # From -d.
   1799     self.directory = tempfile.gettempdir()
   1800     # Keep track of whether or not to delete the directory upon completion.
   1801     self.delete_directory = False
   1802     # From -t.
   1803     self.diag_tests = set(self.DEFAULT_DIAG_TESTS)
   1804     # From -o.
   1805     self.output_file = None
   1806     # From -i.
   1807     self.input_file = None
   1808     # From -m.
   1809     self.metadata_keys = {}
   1810 
   1811     if self.sub_opts:
   1812       for o, a in self.sub_opts:
   1813         if o == '-n':
   1814           self.num_objects = self._ParsePositiveInteger(
   1815               a, 'The -n parameter must be a positive integer.')
   1816         if o == '-c':
   1817           self.processes = self._ParsePositiveInteger(
   1818               a, 'The -c parameter must be a positive integer.')
   1819         if o == '-k':
   1820           self.threads = self._ParsePositiveInteger(
   1821               a, 'The -k parameter must be a positive integer.')
   1822         if o == '-p':
   1823           if a.lower() in self.PARALLEL_STRATEGIES:
   1824             self.parallel_strategy = a.lower()
   1825           else:
   1826             raise CommandException(
   1827                 "'%s' is not a valid parallelism strategy." % a)
   1828         if o == '-y':
   1829           self.num_slices = self._ParsePositiveInteger(
   1830               a, 'The -y parameter must be a positive integer.')
   1831         if o == '-s':
   1832           try:
   1833             self.thru_filesize = HumanReadableToBytes(a)
   1834           except ValueError:
   1835             raise CommandException('Invalid -s parameter.')
   1836         if o == '-d':
   1837           self.directory = a
   1838           if not os.path.exists(self.directory):
   1839             self.delete_directory = True
   1840             os.makedirs(self.directory)
   1841         if o == '-t':
   1842           self.diag_tests = set()
   1843           for test_name in a.strip().split(','):
   1844             if test_name.lower() not in self.ALL_DIAG_TESTS:
   1845               raise CommandException("List of test names (-t) contains invalid "
   1846                                      "test name '%s'." % test_name)
   1847             self.diag_tests.add(test_name)
   1848         if o == '-m':
   1849           pieces = a.split(':')
   1850           if len(pieces) != 2:
   1851             raise CommandException(
   1852                 "Invalid metadata key-value combination '%s'." % a)
   1853           key, value = pieces
   1854           self.metadata_keys[key] = value
   1855         if o == '-o':
   1856           self.output_file = os.path.abspath(a)
   1857         if o == '-i':
   1858           self.input_file = os.path.abspath(a)
   1859           if not os.path.isfile(self.input_file):
   1860             raise CommandException("Invalid input file (-i): '%s'." % a)
   1861           try:
   1862             with open(self.input_file, 'r') as f:
   1863               self.results = json.load(f)
   1864               self.logger.info("Read input file: '%s'.", self.input_file)
   1865           except ValueError:
   1866             raise CommandException("Could not decode input file (-i): '%s'." %
   1867                                    a)
   1868           return
   1869 
   1870     # If parallelism is specified, default parallelism strategy to fan.
   1871     if (self.processes > 1 or self.threads > 1) and not self.parallel_strategy:
   1872       self.parallel_strategy = self.FAN
   1873     elif self.processes == 1 and self.threads == 1 and self.parallel_strategy:
   1874       raise CommandException(
   1875           'Cannot specify parallelism strategy (-p) without also specifying '
   1876           'multiple threads and/or processes (-c and/or -k).')
   1877 
   1878     if not self.args:
   1879       self.RaiseWrongNumberOfArgumentsException()
   1880 
   1881     self.bucket_url = StorageUrlFromString(self.args[0])
   1882     self.provider = self.bucket_url.scheme
   1883     if not self.bucket_url.IsCloudUrl() and self.bucket_url.IsBucket():
   1884       raise CommandException('The perfdiag command requires a URL that '
   1885                              'specifies a bucket.\n"%s" is not '
   1886                              'valid.' % self.args[0])
   1887 
   1888     if (self.thru_filesize > HumanReadableToBytes('2GiB') and
   1889         (self.RTHRU in self.diag_tests or self.WTHRU in self.diag_tests)):
   1890       raise CommandException(
   1891           'For in-memory tests maximum file size is 2GiB. For larger file '
   1892           'sizes, specify rthru_file and/or wthru_file with the -t option.')
   1893 
   1894     perform_slice = self.parallel_strategy in (self.SLICE, self.BOTH)
   1895     slice_not_available = (
   1896         self.provider == 's3' and self.diag_tests.intersection(self.WTHRU,
   1897                                                                self.WTHRU_FILE))
   1898     if perform_slice and slice_not_available:
   1899       raise CommandException('Sliced uploads are not available for s3. '
   1900                              'Use -p fan or sequential uploads for s3.')
   1901 
   1902     # Ensure the bucket exists.
   1903     self.gsutil_api.GetBucket(self.bucket_url.bucket_name,
   1904                               provider=self.bucket_url.scheme,
   1905                               fields=['id'])
   1906     self.exceptions = [httplib.HTTPException, socket.error, socket.gaierror,
   1907                        socket.timeout, httplib.BadStatusLine,
   1908                        ServiceException]
   1909 
   1910   # Command entry point.
   1911   def RunCommand(self):
   1912     """Called by gsutil when the command is being invoked."""
   1913     self._ParseArgs()
   1914 
   1915     if self.input_file:
   1916       self._DisplayResults()
   1917       return 0
   1918 
   1919     # We turn off retries in the underlying boto library because the
   1920     # _RunOperation function handles errors manually so it can count them.
   1921     boto.config.set('Boto', 'num_retries', '0')
   1922 
   1923     self.logger.info(
   1924         'Number of iterations to run: %d\n'
   1925         'Base bucket URI: %s\n'
   1926         'Number of processes: %d\n'
   1927         'Number of threads: %d\n'
   1928         'Parallelism strategy: %s\n'
   1929         'Throughput file size: %s\n'
   1930         'Diagnostics to run: %s',
   1931         self.num_objects,
   1932         self.bucket_url,
   1933         self.processes,
   1934         self.threads,
   1935         self.parallel_strategy,
   1936         MakeHumanReadable(self.thru_filesize),
   1937         (', '.join(self.diag_tests)))
   1938 
   1939     try:
   1940       self._SetUp()
   1941 
   1942       # Collect generic system info.
   1943       self._CollectSysInfo()
   1944       # Collect netstat info and disk counters before tests (and again later).
   1945       netstat_output = self._GetTcpStats()
   1946       if netstat_output:
   1947         self.results['sysinfo']['netstat_start'] = netstat_output
   1948       if IS_LINUX:
   1949         self.results['sysinfo']['disk_counters_start'] = self._GetDiskCounters()
   1950       # Record bucket URL.
   1951       self.results['bucket_uri'] = str(self.bucket_url)
   1952       self.results['json_format'] = 'perfdiag'
   1953       self.results['metadata'] = self.metadata_keys
   1954 
   1955       if self.LAT in self.diag_tests:
   1956         self._RunLatencyTests()
   1957       if self.RTHRU in self.diag_tests:
   1958         self._RunReadThruTests()
   1959       # Run WTHRU_FILE before RTHRU_FILE. If data is created in WTHRU_FILE it
   1960       # will be used in RTHRU_FILE to save time and bandwidth.
   1961       if self.WTHRU_FILE in self.diag_tests:
   1962         self._RunWriteThruTests(use_file=True)
   1963       if self.RTHRU_FILE in self.diag_tests:
   1964         self._RunReadThruTests(use_file=True)
   1965       if self.WTHRU in self.diag_tests:
   1966         self._RunWriteThruTests()
   1967       if self.LIST in self.diag_tests:
   1968         self._RunListTests()
   1969 
   1970       # Collect netstat info and disk counters after tests.
   1971       netstat_output = self._GetTcpStats()
   1972       if netstat_output:
   1973         self.results['sysinfo']['netstat_end'] = netstat_output
   1974       if IS_LINUX:
   1975         self.results['sysinfo']['disk_counters_end'] = self._GetDiskCounters()
   1976 
   1977       self.results['total_requests'] = self.total_requests
   1978       self.results['request_errors'] = self.request_errors
   1979       self.results['error_responses_by_code'] = self.error_responses_by_code
   1980       self.results['connection_breaks'] = self.connection_breaks
   1981       self.results['gsutil_version'] = gslib.VERSION
   1982       self.results['boto_version'] = boto.__version__
   1983 
   1984       self._TearDown()
   1985       self._DisplayResults()
   1986     finally:
   1987       # TODO: Install signal handlers so this is performed in response to a
   1988       # terminating signal; consider multi-threaded object deletes during
   1989       # cleanup so it happens quickly.
   1990       self._TearDown()
   1991 
   1992     return 0
   1993 
   1994 
   1995 def StorageUrlToUploadObjectMetadata(storage_url):
   1996   if storage_url.IsCloudUrl() and storage_url.IsObject():
   1997     upload_target = apitools_messages.Object()
   1998     upload_target.name = storage_url.object_name
   1999     upload_target.bucket = storage_url.bucket_name
   2000     return upload_target
   2001   else:
   2002     raise CommandException('Non-cloud URL upload target %s was created in '
   2003                            'perfdiag implemenation.' % storage_url)
   2004