Home | History | Annotate | Download | only in python
      1 #!/usr/bin/env python
      2 # Copyright (c) Sasha Goldshtein
      3 # Licensed under the Apache License, Version 2.0 (the "License")
      4 
      5 import os
      6 import subprocess
      7 from bcc import SymbolCache, BPF
      8 from unittest import main, TestCase
      9 
     10 class TestKSyms(TestCase):
     11     def grab_sym(self):
     12         address = ""
     13         aliases = []
     14 
     15         # Grab the first symbol in kallsyms that has type 't' or 'T'.
     16         # Also, find all aliases of this symbol which are identifiable
     17         # by the same address.
     18         with open("/proc/kallsyms", "rb") as f:
     19             for line in f:
     20 
     21                 # Extract the first 3 columns only. The 4th column
     22                 # containing the module name may not exist for all
     23                 # symbols.
     24                 (addr, t, name) = line.strip().split()[:3]
     25                 if t == b"t" or t == b"T":
     26                     if not address:
     27                         address = addr
     28                     if addr == address:
     29                         aliases.append(name)
     30 
     31         # Return all aliases of the first symbol.
     32         return (address, aliases)
     33 
     34     def test_ksymname(self):
     35         sym = BPF.ksymname(b"__kmalloc")
     36         self.assertIsNotNone(sym)
     37         self.assertNotEqual(sym, 0)
     38 
     39     def test_ksym(self):
     40         (addr, aliases) = self.grab_sym()
     41         sym = BPF.ksym(int(addr, 16))
     42         found = sym in aliases
     43         self.assertTrue(found)
     44 
     45 class Harness(TestCase):
     46     def setUp(self):
     47         self.build_command()
     48         subprocess.check_output('objcopy --only-keep-debug dummy dummy.debug'
     49                                 .split())
     50         self.debug_command()
     51         subprocess.check_output('strip dummy'.split())
     52         self.process = subprocess.Popen('./dummy', stdout=subprocess.PIPE)
     53         # The process prints out the address of some symbol, which we then
     54         # try to resolve in the test.
     55         self.addr = int(self.process.stdout.readline().strip(), 16)
     56         self.syms = SymbolCache(self.process.pid)
     57 
     58     def tearDown(self):
     59         self.process.kill()
     60         self.process.wait()
     61         self.process.stdout.close()
     62         self.process = None
     63 
     64     def resolve_addr(self):
     65         sym, offset, module = self.syms.resolve(self.addr, False)
     66         self.assertEqual(sym, self.mangled_name)
     67         self.assertEqual(offset, 0)
     68         self.assertTrue(module[-5:] == b'dummy')
     69         sym, offset, module = self.syms.resolve(self.addr, True)
     70         self.assertEqual(sym, b'some_namespace::some_function(int, int)')
     71         self.assertEqual(offset, 0)
     72         self.assertTrue(module[-5:] == b'dummy')
     73 
     74 
     75     def resolve_name(self):
     76         script_dir = os.path.dirname(os.path.realpath(__file__).encode("utf8"))
     77         addr = self.syms.resolve_name(os.path.join(script_dir, b'dummy'),
     78                                       self.mangled_name)
     79         self.assertEqual(addr, self.addr)
     80         pass
     81 
     82 class TestDebuglink(Harness):
     83     def build_command(self):
     84         subprocess.check_output('g++ -o dummy dummy.cc'.split())
     85         lines = subprocess.check_output('nm dummy'.split()).splitlines()
     86         for line in lines:
     87             if b"some_function" in line:
     88                 self.mangled_name = line.split(b' ')[2]
     89                 break
     90         self.assertTrue(self.mangled_name)
     91 
     92     def debug_command(self):
     93         subprocess.check_output('objcopy --add-gnu-debuglink=dummy.debug dummy'
     94                                 .split())
     95 
     96     def tearDown(self):
     97         super(TestDebuglink, self).tearDown()
     98         subprocess.check_output('rm dummy dummy.debug'.split())
     99 
    100     def test_resolve_addr(self):
    101         self.resolve_addr()
    102 
    103     def test_resolve_name(self):
    104         self.resolve_name()
    105 
    106 class TestBuildid(Harness):
    107     def build_command(self):
    108         subprocess.check_output(('g++ -o dummy -Xlinker ' + \
    109                '--build-id=0x123456789abcdef0123456789abcdef012345678 dummy.cc')
    110                .split())
    111         lines = subprocess.check_output('nm dummy'.split()).splitlines()
    112         for line in lines:
    113             if b"some_function" in line:
    114                 self.mangled_name = line.split(b' ')[2]
    115                 break
    116         self.assertTrue(self.mangled_name)
    117 
    118 
    119     def debug_command(self):
    120         subprocess.check_output('mkdir -p /usr/lib/debug/.build-id/12'.split())
    121         subprocess.check_output(('mv dummy.debug /usr/lib/debug/.build-id' + \
    122             '/12/3456789abcdef0123456789abcdef012345678.debug').split())
    123 
    124     def tearDown(self):
    125         super(TestBuildid, self).tearDown()
    126         subprocess.check_output('rm dummy'.split())
    127         subprocess.check_output(('rm /usr/lib/debug/.build-id/12' +
    128             '/3456789abcdef0123456789abcdef012345678.debug').split())
    129 
    130     def test_resolve_name(self):
    131         self.resolve_addr()
    132 
    133     def test_resolve_addr(self):
    134         self.resolve_name()
    135 
    136 if __name__ == "__main__":
    137     main()
    138