Home | History | Annotate | Download | only in deps
      1 #!/usr/bin/env python
      2 
      3 """
      4 Heartbeat server/client to detect soft lockups
      5 """
      6 
      7 import socket, os, sys, time, getopt
      8 
      9 def daemonize(output_file):
     10     try:
     11         pid = os.fork()
     12     except OSError, e:
     13         raise Exception, "error %d: %s" % (e.strerror, e.errno)
     14 
     15     if pid:
     16         os._exit(0)
     17 
     18     os.umask(0)
     19     os.setsid()
     20     sys.stdout.flush()
     21     sys.stderr.flush()
     22 
     23     if file:
     24         output_handle = file(output_file, 'a+', 0)
     25         # autoflush stdout/stderr
     26         sys.stdout = os.fdopen(sys.stdout.fileno(), 'w', 0)
     27         sys.stderr = os.fdopen(sys.stderr.fileno(), 'w', 0)
     28     else:
     29         output_handle = file('/dev/null', 'a+')
     30 
     31     stdin_handle = open('/dev/null', 'r')
     32     os.dup2(output_handle.fileno(), sys.stdout.fileno())
     33     os.dup2(output_handle.fileno(), sys.stderr.fileno())
     34     os.dup2(stdin_handle.fileno(), sys.stdin.fileno())
     35 
     36 def recv_all(sock):
     37     total_data = []
     38     while True:
     39         data = sock.recv(1024)
     40         if not data:
     41             break
     42         total_data.append(data)
     43     return ''.join(total_data)
     44 
     45 def run_server(host, port, daemon, file, queue_size, threshold, drift):
     46     if daemon:
     47         daemonize(output_file=file)
     48     sock = socket.socket(socket.AF_INET, socket.SOCK_STREAM)
     49     sock.bind((host, port))
     50     sock.listen(queue_size)
     51     timeout_interval = threshold * 2
     52     prev_check_timestamp = float(time.time())
     53     while 1:
     54         c_sock, c_addr = sock.accept()
     55         heartbeat = recv_all(c_sock)
     56         local_timestamp = float(time.time())
     57         drift = check_heartbeat(heartbeat, local_timestamp, threshold, check_drift)
     58         # NOTE: this doesn't work if the only client is the one that timed
     59         # out, but anything more complete would require another thread and
     60         # a lock for client_prev_timestamp.
     61         if local_timestamp - prev_check_timestamp > threshold * 2.0:
     62             check_for_timeouts(threshold, check_drift)
     63             prev_check_timestamp = local_timestamp
     64         if verbose:
     65             if check_drift:
     66                 print "%.2f: %s (%s)" % (local_timestamp, heartbeat, drift)
     67             else:
     68                 print "%.2f: %s" % (local_timestamp, heartbeat)
     69 
     70 def run_client(host, port, daemon, file, interval):
     71     if daemon:
     72         daemonize(output_file=file)
     73     seq = 1
     74     while 1:
     75         try:
     76             sock = socket.socket(socket.AF_INET, socket.SOCK_STREAM)
     77             sock.connect((host, port))
     78             heartbeat = get_heartbeat(seq)
     79             sock.sendall(heartbeat)
     80             sock.close()
     81             if verbose:
     82                 print heartbeat
     83         except socket.error, (value, message):
     84             print "%.2f: ERROR, %d - %s" % (float(time.time()), value, message)
     85 
     86         seq += 1
     87         time.sleep(interval)
     88 
     89 def get_heartbeat(seq=1):
     90     return "%s %06d %.2f" % (hostname, seq, float(time.time()))
     91 
     92 def check_heartbeat(heartbeat, local_timestamp, threshold, check_drift):
     93     hostname, seq, timestamp = heartbeat.rsplit()
     94     timestamp = float(timestamp)
     95     if client_prev_timestamp.has_key(hostname):
     96         delta = local_timestamp - client_prev_timestamp[hostname]
     97         if delta > threshold:
     98             print "%.2f: ALERT, SLU detected on host %s, delta %ds" \
     99                 % (float(time.time()), hostname, delta)
    100 
    101     client_prev_timestamp[hostname] = local_timestamp
    102 
    103     if check_drift:
    104         if not client_clock_offset.has_key(hostname):
    105             client_clock_offset[hostname] = timestamp - local_timestamp
    106             client_prev_drift[hostname] = 0
    107         drift = timestamp - local_timestamp - client_clock_offset[hostname]
    108         drift_delta = drift - client_prev_drift[hostname]
    109         client_prev_drift[hostname] = drift
    110         return "drift %+4.2f (%+4.2f)" % (drift, drift_delta)
    111 
    112 def check_for_timeouts(threshold, check_drift):
    113     local_timestamp = float(time.time())
    114     hostname_list = list(client_prev_timestamp)
    115     for hostname in hostname_list:
    116         timestamp = client_prev_timestamp[hostname]
    117         delta = local_timestamp - timestamp
    118         if delta > threshold * 2:
    119             print "%.2f: ALERT, SLU detected on host %s, no heartbeat for %ds" \
    120                 % (local_timestamp, hostname, delta)
    121             del client_prev_timestamp[hostname]
    122             if check_drift:
    123                 del client_clock_offset[hostname]
    124                 del client_prev_drift[hostname]
    125 
    126 def usage():
    127     print """
    128 Usage:
    129 
    130     heartbeat_slu.py --server --address <bind_address> --port <bind_port>
    131                      [--file <output_file>] [--no-daemon] [--verbose]
    132                      [--threshold <heartbeat threshold>]
    133 
    134     heartbeat_slu.py --client --address <server_address> -p <server_port>
    135                      [--file output_file] [--no-daemon] [--verbose]
    136                      [--interval <heartbeat interval in seconds>]
    137 """
    138 
    139 # host information and global data
    140 hostname = socket.gethostname()
    141 client_prev_timestamp = {}
    142 client_clock_offset = {}
    143 client_prev_drift = {}
    144 
    145 # default param values
    146 host_port = 9001
    147 host_address = ''
    148 interval = 1 # seconds between heartbeats
    149 threshold = 10 # seconds late till alert
    150 is_server = False
    151 is_daemon = True
    152 file_server = "/tmp/heartbeat_server.out"
    153 file_client = "/tmp/heartbeat_client.out"
    154 file_selected = None
    155 queue_size = 5
    156 verbose = False
    157 check_drift = False
    158 
    159 # process cmdline opts
    160 try:
    161     opts, args = getopt.getopt(sys.argv[1:], "vhsfd:p:a:i:t:", [
    162                     "server", "client", "no-daemon", "address=", "port=",
    163                     "file=", "server", "interval=", "threshold=", "verbose",
    164                     "check-drift", "help"])
    165 except getopt.GetoptError, e:
    166     print "error: %s" % str(e)
    167     usage()
    168     exit(1)
    169 
    170 for param, value in opts:
    171     if param in ["-p", "--port"]:
    172         host_port = int(value)
    173     elif param in ["-a", "--address"]:
    174         host_address = value
    175     elif param in ["-s", "--server"]:
    176         is_server = True
    177     elif param in ["-c", "--client"]:
    178         is_server = False
    179     elif param in ["--no-daemon"]:
    180         is_daemon = False
    181     elif param in ["-f", "--file"]:
    182         file_selected = value
    183     elif param in ["-i", "--interval"]:
    184         interval = int(value)
    185     elif param in ["-t", "--threshold"]:
    186         threshold = int(value)
    187     elif param in ["-d", "--check-drift"]:
    188         check_drift = True
    189     elif param in ["-v", "--verbose"]:
    190         verbose = True
    191     elif param in ["-h", "--help"]:
    192         usage()
    193         exit(0)
    194     else:
    195         print "error: unrecognized option: %s" % value
    196         usage()
    197         exit(1)
    198 
    199 # run until we're terminated
    200 if is_server:
    201     file_server = file_selected or file_server
    202     run_server(host_address, host_port, is_daemon, file_server, queue_size, threshold, check_drift)
    203 else:
    204     file_client = file_selected or file_client
    205     run_client(host_address, host_port, is_daemon, file_client, interval)
    206