Home | History | Annotate | Download | only in ccbench
      1 # -*- coding: utf-8 -*-

      2 # This file should be kept compatible with both Python 2.6 and Python >= 3.0.

      3 
      4 from __future__ import division
      5 from __future__ import print_function
      6 
      7 """
      8 ccbench, a Python concurrency benchmark.
      9 """
     10 
     11 import time
     12 import os
     13 import sys
     14 import functools
     15 import itertools
     16 import threading
     17 import subprocess
     18 import socket
     19 from optparse import OptionParser, SUPPRESS_HELP
     20 import platform
     21 
     22 # Compatibility

     23 try:
     24     xrange
     25 except NameError:
     26     xrange = range
     27 
     28 try:
     29     map = itertools.imap
     30 except AttributeError:
     31     pass
     32 
     33 
     34 THROUGHPUT_DURATION = 2.0
     35 
     36 LATENCY_PING_INTERVAL = 0.1
     37 LATENCY_DURATION = 2.0
     38 
     39 BANDWIDTH_PACKET_SIZE = 1024
     40 BANDWIDTH_DURATION = 2.0
     41 
     42 
     43 def task_pidigits():
     44     """Pi calculation (Python)"""
     45     _map = map
     46     _count = itertools.count
     47     _islice = itertools.islice
     48 
     49     def calc_ndigits(n):
     50         # From http://shootout.alioth.debian.org/

     51         def gen_x():
     52             return _map(lambda k: (k, 4*k + 2, 0, 2*k + 1), _count(1))
     53 
     54         def compose(a, b):
     55             aq, ar, as_, at = a
     56             bq, br, bs, bt = b
     57             return (aq * bq,
     58                     aq * br + ar * bt,
     59                     as_ * bq + at * bs,
     60                     as_ * br + at * bt)
     61 
     62         def extract(z, j):
     63             q, r, s, t = z
     64             return (q*j + r) // (s*j + t)
     65 
     66         def pi_digits():
     67             z = (1, 0, 0, 1)
     68             x = gen_x()
     69             while 1:
     70                 y = extract(z, 3)
     71                 while y != extract(z, 4):
     72                     z = compose(z, next(x))
     73                     y = extract(z, 3)
     74                 z = compose((10, -10*y, 0, 1), z)
     75                 yield y
     76 
     77         return list(_islice(pi_digits(), n))
     78 
     79     return calc_ndigits, (50, )
     80 
     81 def task_regex():
     82     """regular expression (C)"""
     83     # XXX this task gives horrendous latency results.

     84     import re
     85     # Taken from the `inspect` module

     86     pat = re.compile(r'^(\s*def\s)|(.*(?<!\w)lambda(:|\s))|^(\s*@)', re.MULTILINE)
     87     with open(__file__, "r") as f:
     88         arg = f.read(2000)
     89 
     90     def findall(s):
     91         t = time.time()
     92         try:
     93             return pat.findall(s)
     94         finally:
     95             print(time.time() - t)
     96     return pat.findall, (arg, )
     97 
     98 def task_sort():
     99     """list sorting (C)"""
    100     def list_sort(l):
    101         l = l[::-1]
    102         l.sort()
    103 
    104     return list_sort, (list(range(1000)), )
    105 
    106 def task_compress_zlib():
    107     """zlib compression (C)"""
    108     import zlib
    109     with open(__file__, "rb") as f:
    110         arg = f.read(5000) * 3
    111 
    112     def compress(s):
    113         zlib.decompress(zlib.compress(s, 5))
    114     return compress, (arg, )
    115 
    116 def task_compress_bz2():
    117     """bz2 compression (C)"""
    118     import bz2
    119     with open(__file__, "rb") as f:
    120         arg = f.read(3000) * 2
    121 
    122     def compress(s):
    123         bz2.compress(s)
    124     return compress, (arg, )
    125 
    126 def task_hashing():
    127     """SHA1 hashing (C)"""
    128     import hashlib
    129     with open(__file__, "rb") as f:
    130         arg = f.read(5000) * 30
    131 
    132     def compute(s):
    133         hashlib.sha1(s).digest()
    134     return compute, (arg, )
    135 
    136 
    137 throughput_tasks = [task_pidigits, task_regex]
    138 for mod in 'bz2', 'hashlib':
    139     try:
    140         globals()[mod] = __import__(mod)
    141     except ImportError:
    142         globals()[mod] = None
    143 
    144 # For whatever reasons, zlib gives irregular results, so we prefer bz2 or

    145 # hashlib if available.

    146 # (NOTE: hashlib releases the GIL from 2.7 and 3.1 onwards)

    147 if bz2 is not None:
    148     throughput_tasks.append(task_compress_bz2)
    149 elif hashlib is not None:
    150     throughput_tasks.append(task_hashing)
    151 else:
    152     throughput_tasks.append(task_compress_zlib)
    153 
    154 latency_tasks = throughput_tasks
    155 bandwidth_tasks = [task_pidigits]
    156 
    157 
    158 class TimedLoop:
    159     def __init__(self, func, args):
    160         self.func = func
    161         self.args = args
    162 
    163     def __call__(self, start_time, min_duration, end_event, do_yield=False):
    164         step = 20
    165         niters = 0
    166         duration = 0.0
    167         _time = time.time
    168         _sleep = time.sleep
    169         _func = self.func
    170         _args = self.args
    171         t1 = start_time
    172         while True:
    173             for i in range(step):
    174                 _func(*_args)
    175             t2 = _time()
    176             # If another thread terminated, the current measurement is invalid

    177             # => return the previous one.

    178             if end_event:
    179                 return niters, duration
    180             niters += step
    181             duration = t2 - start_time
    182             if duration >= min_duration:
    183                 end_event.append(None)
    184                 return niters, duration
    185             if t2 - t1 < 0.01:
    186                 # Minimize interference of measurement on overall runtime

    187                 step = step * 3 // 2
    188             elif do_yield:
    189                 # OS scheduling of Python threads is sometimes so bad that we

    190                 # have to force thread switching ourselves, otherwise we get

    191                 # completely useless results.

    192                 _sleep(0.0001)
    193             t1 = t2
    194 
    195 
    196 def run_throughput_test(func, args, nthreads):
    197     assert nthreads >= 1
    198 
    199     # Warm up

    200     func(*args)
    201 
    202     results = []
    203     loop = TimedLoop(func, args)
    204     end_event = []
    205 
    206     if nthreads == 1:
    207         # Pure single-threaded performance, without any switching or

    208         # synchronization overhead.

    209         start_time = time.time()
    210         results.append(loop(start_time, THROUGHPUT_DURATION,
    211                             end_event, do_yield=False))
    212         return results
    213 
    214     started = False
    215     ready_cond = threading.Condition()
    216     start_cond = threading.Condition()
    217     ready = []
    218 
    219     def run():
    220         with ready_cond:
    221             ready.append(None)
    222             ready_cond.notify()
    223         with start_cond:
    224             while not started:
    225                 start_cond.wait()
    226         results.append(loop(start_time, THROUGHPUT_DURATION,
    227                             end_event, do_yield=True))
    228 
    229     threads = []
    230     for i in range(nthreads):
    231         threads.append(threading.Thread(target=run))
    232     for t in threads:
    233         t.setDaemon(True)
    234         t.start()
    235     # We don't want measurements to include thread startup overhead,

    236     # so we arrange for timing to start after all threads are ready.

    237     with ready_cond:
    238         while len(ready) < nthreads:
    239             ready_cond.wait()
    240     with start_cond:
    241         start_time = time.time()
    242         started = True
    243         start_cond.notify(nthreads)
    244     for t in threads:
    245         t.join()
    246 
    247     return results
    248 
    249 def run_throughput_tests(max_threads):
    250     for task in throughput_tasks:
    251         print(task.__doc__)
    252         print()
    253         func, args = task()
    254         nthreads = 1
    255         baseline_speed = None
    256         while nthreads <= max_threads:
    257             results = run_throughput_test(func, args, nthreads)
    258             # Taking the max duration rather than average gives pessimistic

    259             # results rather than optimistic.

    260             speed = sum(r[0] for r in results) / max(r[1] for r in results)
    261             print("threads=%d: %d" % (nthreads, speed), end="")
    262             if baseline_speed is None:
    263                 print(" iterations/s.")
    264                 baseline_speed = speed
    265             else:
    266                 print(" ( %d %%)" % (speed / baseline_speed * 100))
    267             nthreads += 1
    268         print()
    269 
    270 
    271 LAT_END = "END"
    272 
    273 def _sendto(sock, s, addr):
    274     sock.sendto(s.encode('ascii'), addr)
    275 
    276 def _recv(sock, n):
    277     return sock.recv(n).decode('ascii')
    278 
    279 def latency_client(addr, nb_pings, interval):
    280     sock = socket.socket(socket.AF_INET, socket.SOCK_DGRAM)
    281     _time = time.time
    282     _sleep = time.sleep
    283     def _ping():
    284         _sendto(sock, "%r\n" % _time(), addr)
    285     # The first ping signals the parent process that we are ready.

    286     _ping()
    287     # We give the parent a bit of time to notice.

    288     _sleep(1.0)
    289     for i in range(nb_pings):
    290         _sleep(interval)
    291         _ping()
    292     _sendto(sock, LAT_END + "\n", addr)
    293 
    294 def run_latency_client(**kwargs):
    295     cmd_line = [sys.executable, '-E', os.path.abspath(__file__)]
    296     cmd_line.extend(['--latclient', repr(kwargs)])
    297     return subprocess.Popen(cmd_line) #, stdin=subprocess.PIPE,

    298                             #stdout=subprocess.PIPE, stderr=subprocess.STDOUT)

    299 
    300 def run_latency_test(func, args, nthreads):
    301     # Create a listening socket to receive the pings. We use UDP which should

    302     # be painlessly cross-platform.

    303     sock = socket.socket(socket.AF_INET, socket.SOCK_DGRAM)
    304     sock.bind(("127.0.0.1", 0))
    305     addr = sock.getsockname()
    306 
    307     interval = LATENCY_PING_INTERVAL
    308     duration = LATENCY_DURATION
    309     nb_pings = int(duration / interval)
    310 
    311     results = []
    312     threads = []
    313     end_event = []
    314     start_cond = threading.Condition()
    315     started = False
    316     if nthreads > 0:
    317         # Warm up

    318         func(*args)
    319 
    320         results = []
    321         loop = TimedLoop(func, args)
    322         ready = []
    323         ready_cond = threading.Condition()
    324 
    325         def run():
    326             with ready_cond:
    327                 ready.append(None)
    328                 ready_cond.notify()
    329             with start_cond:
    330                 while not started:
    331                     start_cond.wait()
    332             loop(start_time, duration * 1.5, end_event, do_yield=False)
    333 
    334         for i in range(nthreads):
    335             threads.append(threading.Thread(target=run))
    336         for t in threads:
    337             t.setDaemon(True)
    338             t.start()
    339         # Wait for threads to be ready

    340         with ready_cond:
    341             while len(ready) < nthreads:
    342                 ready_cond.wait()
    343 
    344     # Run the client and wait for the first ping(s) to arrive before

    345     # unblocking the background threads.

    346     chunks = []
    347     process = run_latency_client(addr=sock.getsockname(),
    348                                  nb_pings=nb_pings, interval=interval)
    349     s = _recv(sock, 4096)
    350     _time = time.time
    351 
    352     with start_cond:
    353         start_time = _time()
    354         started = True
    355         start_cond.notify(nthreads)
    356 
    357     while LAT_END not in s:
    358         s = _recv(sock, 4096)
    359         t = _time()
    360         chunks.append((t, s))
    361 
    362     # Tell the background threads to stop.

    363     end_event.append(None)
    364     for t in threads:
    365         t.join()
    366     process.wait()
    367 
    368     for recv_time, chunk in chunks:
    369         # NOTE: it is assumed that a line sent by a client wasn't received

    370         # in two chunks because the lines are very small.

    371         for line in chunk.splitlines():
    372             line = line.strip()
    373             if line and line != LAT_END:
    374                 send_time = eval(line)
    375                 assert isinstance(send_time, float)
    376                 results.append((send_time, recv_time))
    377 
    378     return results
    379 
    380 def run_latency_tests(max_threads):
    381     for task in latency_tasks:
    382         print("Background CPU task:", task.__doc__)
    383         print()
    384         func, args = task()
    385         nthreads = 0
    386         while nthreads <= max_threads:
    387             results = run_latency_test(func, args, nthreads)
    388             n = len(results)
    389             # We print out milliseconds

    390             lats = [1000 * (t2 - t1) for (t1, t2) in results]
    391             #print(list(map(int, lats)))

    392             avg = sum(lats) / n
    393             dev = (sum((x - avg) ** 2 for x in lats) / n) ** 0.5
    394             print("CPU threads=%d: %d ms. (std dev: %d ms.)" % (nthreads, avg, dev), end="")
    395             print()
    396             #print("    [... from %d samples]" % n)

    397             nthreads += 1
    398         print()
    399 
    400 
    401 BW_END = "END"
    402 
    403 def bandwidth_client(addr, packet_size, duration):
    404     sock = socket.socket(socket.AF_INET, socket.SOCK_DGRAM)
    405     sock.bind(("127.0.0.1", 0))
    406     local_addr = sock.getsockname()
    407     _time = time.time
    408     _sleep = time.sleep
    409     def _send_chunk(msg):
    410         _sendto(sock, ("%r#%s\n" % (local_addr, msg)).rjust(packet_size), addr)
    411     # We give the parent some time to be ready.

    412     _sleep(1.0)
    413     try:
    414         start_time = _time()
    415         end_time = start_time + duration * 2.0
    416         i = 0
    417         while _time() < end_time:
    418             _send_chunk(str(i))
    419             s = _recv(sock, packet_size)
    420             assert len(s) == packet_size
    421             i += 1
    422         _send_chunk(BW_END)
    423     finally:
    424         sock.close()
    425 
    426 def run_bandwidth_client(**kwargs):
    427     cmd_line = [sys.executable, '-E', os.path.abspath(__file__)]
    428     cmd_line.extend(['--bwclient', repr(kwargs)])
    429     return subprocess.Popen(cmd_line) #, stdin=subprocess.PIPE,

    430                             #stdout=subprocess.PIPE, stderr=subprocess.STDOUT)

    431 
    432 def run_bandwidth_test(func, args, nthreads):
    433     # Create a listening socket to receive the packets. We use UDP which should

    434     # be painlessly cross-platform.

    435     sock = socket.socket(socket.AF_INET, socket.SOCK_DGRAM)
    436     sock.bind(("127.0.0.1", 0))
    437     addr = sock.getsockname()
    438 
    439     duration = BANDWIDTH_DURATION
    440     packet_size = BANDWIDTH_PACKET_SIZE
    441 
    442     results = []
    443     threads = []
    444     end_event = []
    445     start_cond = threading.Condition()
    446     started = False
    447     if nthreads > 0:
    448         # Warm up

    449         func(*args)
    450 
    451         results = []
    452         loop = TimedLoop(func, args)
    453         ready = []
    454         ready_cond = threading.Condition()
    455 
    456         def run():
    457             with ready_cond:
    458                 ready.append(None)
    459                 ready_cond.notify()
    460             with start_cond:
    461                 while not started:
    462                     start_cond.wait()
    463             loop(start_time, duration * 1.5, end_event, do_yield=False)
    464 
    465         for i in range(nthreads):
    466             threads.append(threading.Thread(target=run))
    467         for t in threads:
    468             t.setDaemon(True)
    469             t.start()
    470         # Wait for threads to be ready

    471         with ready_cond:
    472             while len(ready) < nthreads:
    473                 ready_cond.wait()
    474 
    475     # Run the client and wait for the first packet to arrive before

    476     # unblocking the background threads.

    477     process = run_bandwidth_client(addr=addr,
    478                                    packet_size=packet_size,
    479                                    duration=duration)
    480     _time = time.time
    481     # This will also wait for the parent to be ready

    482     s = _recv(sock, packet_size)
    483     remote_addr = eval(s.partition('#')[0])
    484 
    485     with start_cond:
    486         start_time = _time()
    487         started = True
    488         start_cond.notify(nthreads)
    489 
    490     n = 0
    491     first_time = None
    492     while not end_event and BW_END not in s:
    493         _sendto(sock, s, remote_addr)
    494         s = _recv(sock, packet_size)
    495         if first_time is None:
    496             first_time = _time()
    497         n += 1
    498     end_time = _time()
    499 
    500     end_event.append(None)
    501     for t in threads:
    502         t.join()
    503     process.kill()
    504 
    505     return (n - 1) / (end_time - first_time)
    506 
    507 def run_bandwidth_tests(max_threads):
    508     for task in bandwidth_tasks:
    509         print("Background CPU task:", task.__doc__)
    510         print()
    511         func, args = task()
    512         nthreads = 0
    513         baseline_speed = None
    514         while nthreads <= max_threads:
    515             results = run_bandwidth_test(func, args, nthreads)
    516             speed = results
    517             #speed = len(results) * 1.0 / results[-1][0]

    518             print("CPU threads=%d: %.1f" % (nthreads, speed), end="")
    519             if baseline_speed is None:
    520                 print(" packets/s.")
    521                 baseline_speed = speed
    522             else:
    523                 print(" ( %d %%)" % (speed / baseline_speed * 100))
    524             nthreads += 1
    525         print()
    526 
    527 
    528 def main():
    529     usage = "usage: %prog [-h|--help] [options]"
    530     parser = OptionParser(usage=usage)
    531     parser.add_option("-t", "--throughput",
    532                       action="store_true", dest="throughput", default=False,
    533                       help="run throughput tests")
    534     parser.add_option("-l", "--latency",
    535                       action="store_true", dest="latency", default=False,
    536                       help="run latency tests")
    537     parser.add_option("-b", "--bandwidth",
    538                       action="store_true", dest="bandwidth", default=False,
    539                       help="run I/O bandwidth tests")
    540     parser.add_option("-i", "--interval",
    541                       action="store", type="int", dest="check_interval", default=None,
    542                       help="sys.setcheckinterval() value")
    543     parser.add_option("-I", "--switch-interval",
    544                       action="store", type="float", dest="switch_interval", default=None,
    545                       help="sys.setswitchinterval() value")
    546     parser.add_option("-n", "--num-threads",
    547                       action="store", type="int", dest="nthreads", default=4,
    548                       help="max number of threads in tests")
    549 
    550     # Hidden option to run the pinging and bandwidth clients

    551     parser.add_option("", "--latclient",
    552                       action="store", dest="latclient", default=None,
    553                       help=SUPPRESS_HELP)
    554     parser.add_option("", "--bwclient",
    555                       action="store", dest="bwclient", default=None,
    556                       help=SUPPRESS_HELP)
    557 
    558     options, args = parser.parse_args()
    559     if args:
    560         parser.error("unexpected arguments")
    561 
    562     if options.latclient:
    563         kwargs = eval(options.latclient)
    564         latency_client(**kwargs)
    565         return
    566 
    567     if options.bwclient:
    568         kwargs = eval(options.bwclient)
    569         bandwidth_client(**kwargs)
    570         return
    571 
    572     if not options.throughput and not options.latency and not options.bandwidth:
    573         options.throughput = options.latency = options.bandwidth = True
    574     if options.check_interval:
    575         sys.setcheckinterval(options.check_interval)
    576     if options.switch_interval:
    577         sys.setswitchinterval(options.switch_interval)
    578 
    579     print("== %s %s (%s) ==" % (
    580         platform.python_implementation(),
    581         platform.python_version(),
    582         platform.python_build()[0],
    583     ))
    584     # Processor identification often has repeated spaces

    585     cpu = ' '.join(platform.processor().split())
    586     print("== %s %s on '%s' ==" % (
    587         platform.machine(),
    588         platform.system(),
    589         cpu,
    590     ))
    591     print()
    592 
    593     if options.throughput:
    594         print("--- Throughput ---")
    595         print()
    596         run_throughput_tests(options.nthreads)
    597 
    598     if options.latency:
    599         print("--- Latency ---")
    600         print()
    601         run_latency_tests(options.nthreads)
    602 
    603     if options.bandwidth:
    604         print("--- I/O bandwidth ---")
    605         print()
    606         run_bandwidth_tests(options.nthreads)
    607 
    608 if __name__ == "__main__":
    609     main()
    610