Home | History | Annotate | Download | only in ccbench
      1 # This file should be kept compatible with both Python 2.6 and Python >= 3.0.
      2 
      3 from __future__ import division
      4 from __future__ import print_function
      5 
      6 """
      7 ccbench, a Python concurrency benchmark.
      8 """
      9 
     10 import time
     11 import os
     12 import sys
     13 import itertools
     14 import threading
     15 import subprocess
     16 import socket
     17 from optparse import OptionParser, SUPPRESS_HELP
     18 import platform
     19 
     20 # Compatibility
     21 try:
     22     xrange
     23 except NameError:
     24     xrange = range
     25 
     26 try:
     27     map = itertools.imap
     28 except AttributeError:
     29     pass
     30 
     31 
     32 THROUGHPUT_DURATION = 2.0
     33 
     34 LATENCY_PING_INTERVAL = 0.1
     35 LATENCY_DURATION = 2.0
     36 
     37 BANDWIDTH_PACKET_SIZE = 1024
     38 BANDWIDTH_DURATION = 2.0
     39 
     40 
     41 def task_pidigits():
     42     """Pi calculation (Python)"""
     43     _map = map
     44     _count = itertools.count
     45     _islice = itertools.islice
     46 
     47     def calc_ndigits(n):
     48         # From http://shootout.alioth.debian.org/
     49         def gen_x():
     50             return _map(lambda k: (k, 4*k + 2, 0, 2*k + 1), _count(1))
     51 
     52         def compose(a, b):
     53             aq, ar, as_, at = a
     54             bq, br, bs, bt = b
     55             return (aq * bq,
     56                     aq * br + ar * bt,
     57                     as_ * bq + at * bs,
     58                     as_ * br + at * bt)
     59 
     60         def extract(z, j):
     61             q, r, s, t = z
     62             return (q*j + r) // (s*j + t)
     63 
     64         def pi_digits():
     65             z = (1, 0, 0, 1)
     66             x = gen_x()
     67             while 1:
     68                 y = extract(z, 3)
     69                 while y != extract(z, 4):
     70                     z = compose(z, next(x))
     71                     y = extract(z, 3)
     72                 z = compose((10, -10*y, 0, 1), z)
     73                 yield y
     74 
     75         return list(_islice(pi_digits(), n))
     76 
     77     return calc_ndigits, (50, )
     78 
     79 def task_regex():
     80     """regular expression (C)"""
     81     # XXX this task gives horrendous latency results.
     82     import re
     83     # Taken from the `inspect` module
     84     pat = re.compile(r'^(\s*def\s)|(.*(?<!\w)lambda(:|\s))|^(\s*@)', re.MULTILINE)
     85     with open(__file__, "r") as f:
     86         arg = f.read(2000)
     87 
     88     def findall(s):
     89         t = time.time()
     90         try:
     91             return pat.findall(s)
     92         finally:
     93             print(time.time() - t)
     94     return pat.findall, (arg, )
     95 
     96 def task_sort():
     97     """list sorting (C)"""
     98     def list_sort(l):
     99         l = l[::-1]
    100         l.sort()
    101 
    102     return list_sort, (list(range(1000)), )
    103 
    104 def task_compress_zlib():
    105     """zlib compression (C)"""
    106     import zlib
    107     with open(__file__, "rb") as f:
    108         arg = f.read(5000) * 3
    109 
    110     def compress(s):
    111         zlib.decompress(zlib.compress(s, 5))
    112     return compress, (arg, )
    113 
    114 def task_compress_bz2():
    115     """bz2 compression (C)"""
    116     import bz2
    117     with open(__file__, "rb") as f:
    118         arg = f.read(3000) * 2
    119 
    120     def compress(s):
    121         bz2.compress(s)
    122     return compress, (arg, )
    123 
    124 def task_hashing():
    125     """SHA1 hashing (C)"""
    126     import hashlib
    127     with open(__file__, "rb") as f:
    128         arg = f.read(5000) * 30
    129 
    130     def compute(s):
    131         hashlib.sha1(s).digest()
    132     return compute, (arg, )
    133 
    134 
    135 throughput_tasks = [task_pidigits, task_regex]
    136 for mod in 'bz2', 'hashlib':
    137     try:
    138         globals()[mod] = __import__(mod)
    139     except ImportError:
    140         globals()[mod] = None
    141 
    142 # For whatever reasons, zlib gives irregular results, so we prefer bz2 or
    143 # hashlib if available.
    144 # (NOTE: hashlib releases the GIL from 2.7 and 3.1 onwards)
    145 if bz2 is not None:
    146     throughput_tasks.append(task_compress_bz2)
    147 elif hashlib is not None:
    148     throughput_tasks.append(task_hashing)
    149 else:
    150     throughput_tasks.append(task_compress_zlib)
    151 
    152 latency_tasks = throughput_tasks
    153 bandwidth_tasks = [task_pidigits]
    154 
    155 
    156 class TimedLoop:
    157     def __init__(self, func, args):
    158         self.func = func
    159         self.args = args
    160 
    161     def __call__(self, start_time, min_duration, end_event, do_yield=False):
    162         step = 20
    163         niters = 0
    164         duration = 0.0
    165         _time = time.time
    166         _sleep = time.sleep
    167         _func = self.func
    168         _args = self.args
    169         t1 = start_time
    170         while True:
    171             for i in range(step):
    172                 _func(*_args)
    173             t2 = _time()
    174             # If another thread terminated, the current measurement is invalid
    175             # => return the previous one.
    176             if end_event:
    177                 return niters, duration
    178             niters += step
    179             duration = t2 - start_time
    180             if duration >= min_duration:
    181                 end_event.append(None)
    182                 return niters, duration
    183             if t2 - t1 < 0.01:
    184                 # Minimize interference of measurement on overall runtime
    185                 step = step * 3 // 2
    186             elif do_yield:
    187                 # OS scheduling of Python threads is sometimes so bad that we
    188                 # have to force thread switching ourselves, otherwise we get
    189                 # completely useless results.
    190                 _sleep(0.0001)
    191             t1 = t2
    192 
    193 
    194 def run_throughput_test(func, args, nthreads):
    195     assert nthreads >= 1
    196 
    197     # Warm up
    198     func(*args)
    199 
    200     results = []
    201     loop = TimedLoop(func, args)
    202     end_event = []
    203 
    204     if nthreads == 1:
    205         # Pure single-threaded performance, without any switching or
    206         # synchronization overhead.
    207         start_time = time.time()
    208         results.append(loop(start_time, THROUGHPUT_DURATION,
    209                             end_event, do_yield=False))
    210         return results
    211 
    212     started = False
    213     ready_cond = threading.Condition()
    214     start_cond = threading.Condition()
    215     ready = []
    216 
    217     def run():
    218         with ready_cond:
    219             ready.append(None)
    220             ready_cond.notify()
    221         with start_cond:
    222             while not started:
    223                 start_cond.wait()
    224         results.append(loop(start_time, THROUGHPUT_DURATION,
    225                             end_event, do_yield=True))
    226 
    227     threads = []
    228     for i in range(nthreads):
    229         threads.append(threading.Thread(target=run))
    230     for t in threads:
    231         t.setDaemon(True)
    232         t.start()
    233     # We don't want measurements to include thread startup overhead,
    234     # so we arrange for timing to start after all threads are ready.
    235     with ready_cond:
    236         while len(ready) < nthreads:
    237             ready_cond.wait()
    238     with start_cond:
    239         start_time = time.time()
    240         started = True
    241         start_cond.notify(nthreads)
    242     for t in threads:
    243         t.join()
    244 
    245     return results
    246 
    247 def run_throughput_tests(max_threads):
    248     for task in throughput_tasks:
    249         print(task.__doc__)
    250         print()
    251         func, args = task()
    252         nthreads = 1
    253         baseline_speed = None
    254         while nthreads <= max_threads:
    255             results = run_throughput_test(func, args, nthreads)
    256             # Taking the max duration rather than average gives pessimistic
    257             # results rather than optimistic.
    258             speed = sum(r[0] for r in results) / max(r[1] for r in results)
    259             print("threads=%d: %d" % (nthreads, speed), end="")
    260             if baseline_speed is None:
    261                 print(" iterations/s.")
    262                 baseline_speed = speed
    263             else:
    264                 print(" ( %d %%)" % (speed / baseline_speed * 100))
    265             nthreads += 1
    266         print()
    267 
    268 
    269 LAT_END = "END"
    270 
    271 def _sendto(sock, s, addr):
    272     sock.sendto(s.encode('ascii'), addr)
    273 
    274 def _recv(sock, n):
    275     return sock.recv(n).decode('ascii')
    276 
    277 def latency_client(addr, nb_pings, interval):
    278     sock = socket.socket(socket.AF_INET, socket.SOCK_DGRAM)
    279     try:
    280         _time = time.time
    281         _sleep = time.sleep
    282         def _ping():
    283             _sendto(sock, "%r\n" % _time(), addr)
    284         # The first ping signals the parent process that we are ready.
    285         _ping()
    286         # We give the parent a bit of time to notice.
    287         _sleep(1.0)
    288         for i in range(nb_pings):
    289             _sleep(interval)
    290             _ping()
    291         _sendto(sock, LAT_END + "\n", addr)
    292     finally:
    293         sock.close()
    294 
    295 def run_latency_client(**kwargs):
    296     cmd_line = [sys.executable, '-E', os.path.abspath(__file__)]
    297     cmd_line.extend(['--latclient', repr(kwargs)])
    298     return subprocess.Popen(cmd_line) #, stdin=subprocess.PIPE,
    299                             #stdout=subprocess.PIPE, stderr=subprocess.STDOUT)
    300 
    301 def run_latency_test(func, args, nthreads):
    302     # Create a listening socket to receive the pings. We use UDP which should
    303     # be painlessly cross-platform.
    304     sock = socket.socket(socket.AF_INET, socket.SOCK_DGRAM)
    305     sock.bind(("127.0.0.1", 0))
    306     addr = sock.getsockname()
    307 
    308     interval = LATENCY_PING_INTERVAL
    309     duration = LATENCY_DURATION
    310     nb_pings = int(duration / interval)
    311 
    312     results = []
    313     threads = []
    314     end_event = []
    315     start_cond = threading.Condition()
    316     started = False
    317     if nthreads > 0:
    318         # Warm up
    319         func(*args)
    320 
    321         results = []
    322         loop = TimedLoop(func, args)
    323         ready = []
    324         ready_cond = threading.Condition()
    325 
    326         def run():
    327             with ready_cond:
    328                 ready.append(None)
    329                 ready_cond.notify()
    330             with start_cond:
    331                 while not started:
    332                     start_cond.wait()
    333             loop(start_time, duration * 1.5, end_event, do_yield=False)
    334 
    335         for i in range(nthreads):
    336             threads.append(threading.Thread(target=run))
    337         for t in threads:
    338             t.setDaemon(True)
    339             t.start()
    340         # Wait for threads to be ready
    341         with ready_cond:
    342             while len(ready) < nthreads:
    343                 ready_cond.wait()
    344 
    345     # Run the client and wait for the first ping(s) to arrive before
    346     # unblocking the background threads.
    347     chunks = []
    348     process = run_latency_client(addr=sock.getsockname(),
    349                                  nb_pings=nb_pings, interval=interval)
    350     s = _recv(sock, 4096)
    351     _time = time.time
    352 
    353     with start_cond:
    354         start_time = _time()
    355         started = True
    356         start_cond.notify(nthreads)
    357 
    358     while LAT_END not in s:
    359         s = _recv(sock, 4096)
    360         t = _time()
    361         chunks.append((t, s))
    362 
    363     # Tell the background threads to stop.
    364     end_event.append(None)
    365     for t in threads:
    366         t.join()
    367     process.wait()
    368     sock.close()
    369 
    370     for recv_time, chunk in chunks:
    371         # NOTE: it is assumed that a line sent by a client wasn't received
    372         # in two chunks because the lines are very small.
    373         for line in chunk.splitlines():
    374             line = line.strip()
    375             if line and line != LAT_END:
    376                 send_time = eval(line)
    377                 assert isinstance(send_time, float)
    378                 results.append((send_time, recv_time))
    379 
    380     return results
    381 
    382 def run_latency_tests(max_threads):
    383     for task in latency_tasks:
    384         print("Background CPU task:", task.__doc__)
    385         print()
    386         func, args = task()
    387         nthreads = 0
    388         while nthreads <= max_threads:
    389             results = run_latency_test(func, args, nthreads)
    390             n = len(results)
    391             # We print out milliseconds
    392             lats = [1000 * (t2 - t1) for (t1, t2) in results]
    393             #print(list(map(int, lats)))
    394             avg = sum(lats) / n
    395             dev = (sum((x - avg) ** 2 for x in lats) / n) ** 0.5
    396             print("CPU threads=%d: %d ms. (std dev: %d ms.)" % (nthreads, avg, dev), end="")
    397             print()
    398             #print("    [... from %d samples]" % n)
    399             nthreads += 1
    400         print()
    401 
    402 
    403 BW_END = "END"
    404 
    405 def bandwidth_client(addr, packet_size, duration):
    406     sock = socket.socket(socket.AF_INET, socket.SOCK_DGRAM)
    407     sock.bind(("127.0.0.1", 0))
    408     local_addr = sock.getsockname()
    409     _time = time.time
    410     _sleep = time.sleep
    411     def _send_chunk(msg):
    412         _sendto(sock, ("%r#%s\n" % (local_addr, msg)).rjust(packet_size), addr)
    413     # We give the parent some time to be ready.
    414     _sleep(1.0)
    415     try:
    416         start_time = _time()
    417         end_time = start_time + duration * 2.0
    418         i = 0
    419         while _time() < end_time:
    420             _send_chunk(str(i))
    421             s = _recv(sock, packet_size)
    422             assert len(s) == packet_size
    423             i += 1
    424         _send_chunk(BW_END)
    425     finally:
    426         sock.close()
    427 
    428 def run_bandwidth_client(**kwargs):
    429     cmd_line = [sys.executable, '-E', os.path.abspath(__file__)]
    430     cmd_line.extend(['--bwclient', repr(kwargs)])
    431     return subprocess.Popen(cmd_line) #, stdin=subprocess.PIPE,
    432                             #stdout=subprocess.PIPE, stderr=subprocess.STDOUT)
    433 
    434 def run_bandwidth_test(func, args, nthreads):
    435     # Create a listening socket to receive the packets. We use UDP which should
    436     # be painlessly cross-platform.
    437     with socket.socket(socket.AF_INET, socket.SOCK_DGRAM) as sock:
    438         sock.bind(("127.0.0.1", 0))
    439         addr = sock.getsockname()
    440 
    441         duration = BANDWIDTH_DURATION
    442         packet_size = BANDWIDTH_PACKET_SIZE
    443 
    444         results = []
    445         threads = []
    446         end_event = []
    447         start_cond = threading.Condition()
    448         started = False
    449         if nthreads > 0:
    450             # Warm up
    451             func(*args)
    452 
    453             results = []
    454             loop = TimedLoop(func, args)
    455             ready = []
    456             ready_cond = threading.Condition()
    457 
    458             def run():
    459                 with ready_cond:
    460                     ready.append(None)
    461                     ready_cond.notify()
    462                 with start_cond:
    463                     while not started:
    464                         start_cond.wait()
    465                 loop(start_time, duration * 1.5, end_event, do_yield=False)
    466 
    467             for i in range(nthreads):
    468                 threads.append(threading.Thread(target=run))
    469             for t in threads:
    470                 t.setDaemon(True)
    471                 t.start()
    472             # Wait for threads to be ready
    473             with ready_cond:
    474                 while len(ready) < nthreads:
    475                     ready_cond.wait()
    476 
    477         # Run the client and wait for the first packet to arrive before
    478         # unblocking the background threads.
    479         process = run_bandwidth_client(addr=addr,
    480                                        packet_size=packet_size,
    481                                        duration=duration)
    482         _time = time.time
    483         # This will also wait for the parent to be ready
    484         s = _recv(sock, packet_size)
    485         remote_addr = eval(s.partition('#')[0])
    486 
    487         with start_cond:
    488             start_time = _time()
    489             started = True
    490             start_cond.notify(nthreads)
    491 
    492         n = 0
    493         first_time = None
    494         while not end_event and BW_END not in s:
    495             _sendto(sock, s, remote_addr)
    496             s = _recv(sock, packet_size)
    497             if first_time is None:
    498                 first_time = _time()
    499             n += 1
    500         end_time = _time()
    501 
    502     end_event.append(None)
    503     for t in threads:
    504         t.join()
    505     process.kill()
    506 
    507     return (n - 1) / (end_time - first_time)
    508 
    509 def run_bandwidth_tests(max_threads):
    510     for task in bandwidth_tasks:
    511         print("Background CPU task:", task.__doc__)
    512         print()
    513         func, args = task()
    514         nthreads = 0
    515         baseline_speed = None
    516         while nthreads <= max_threads:
    517             results = run_bandwidth_test(func, args, nthreads)
    518             speed = results
    519             #speed = len(results) * 1.0 / results[-1][0]
    520             print("CPU threads=%d: %.1f" % (nthreads, speed), end="")
    521             if baseline_speed is None:
    522                 print(" packets/s.")
    523                 baseline_speed = speed
    524             else:
    525                 print(" ( %d %%)" % (speed / baseline_speed * 100))
    526             nthreads += 1
    527         print()
    528 
    529 
    530 def main():
    531     usage = "usage: %prog [-h|--help] [options]"
    532     parser = OptionParser(usage=usage)
    533     parser.add_option("-t", "--throughput",
    534                       action="store_true", dest="throughput", default=False,
    535                       help="run throughput tests")
    536     parser.add_option("-l", "--latency",
    537                       action="store_true", dest="latency", default=False,
    538                       help="run latency tests")
    539     parser.add_option("-b", "--bandwidth",
    540                       action="store_true", dest="bandwidth", default=False,
    541                       help="run I/O bandwidth tests")
    542     parser.add_option("-i", "--interval",
    543                       action="store", type="int", dest="check_interval", default=None,
    544                       help="sys.setcheckinterval() value")
    545     parser.add_option("-I", "--switch-interval",
    546                       action="store", type="float", dest="switch_interval", default=None,
    547                       help="sys.setswitchinterval() value")
    548     parser.add_option("-n", "--num-threads",
    549                       action="store", type="int", dest="nthreads", default=4,
    550                       help="max number of threads in tests")
    551 
    552     # Hidden option to run the pinging and bandwidth clients
    553     parser.add_option("", "--latclient",
    554                       action="store", dest="latclient", default=None,
    555                       help=SUPPRESS_HELP)
    556     parser.add_option("", "--bwclient",
    557                       action="store", dest="bwclient", default=None,
    558                       help=SUPPRESS_HELP)
    559 
    560     options, args = parser.parse_args()
    561     if args:
    562         parser.error("unexpected arguments")
    563 
    564     if options.latclient:
    565         kwargs = eval(options.latclient)
    566         latency_client(**kwargs)
    567         return
    568 
    569     if options.bwclient:
    570         kwargs = eval(options.bwclient)
    571         bandwidth_client(**kwargs)
    572         return
    573 
    574     if not options.throughput and not options.latency and not options.bandwidth:
    575         options.throughput = options.latency = options.bandwidth = True
    576     if options.check_interval:
    577         sys.setcheckinterval(options.check_interval)
    578     if options.switch_interval:
    579         sys.setswitchinterval(options.switch_interval)
    580 
    581     print("== %s %s (%s) ==" % (
    582         platform.python_implementation(),
    583         platform.python_version(),
    584         platform.python_build()[0],
    585     ))
    586     # Processor identification often has repeated spaces
    587     cpu = ' '.join(platform.processor().split())
    588     print("== %s %s on '%s' ==" % (
    589         platform.machine(),
    590         platform.system(),
    591         cpu,
    592     ))
    593     print()
    594 
    595     if options.throughput:
    596         print("--- Throughput ---")
    597         print()
    598         run_throughput_tests(options.nthreads)
    599 
    600     if options.latency:
    601         print("--- Latency ---")
    602         print()
    603         run_latency_tests(options.nthreads)
    604 
    605     if options.bandwidth:
    606         print("--- I/O bandwidth ---")
    607         print()
    608         run_bandwidth_tests(options.nthreads)
    609 
    610 if __name__ == "__main__":
    611     main()
    612