Home | History | Annotate | Download | only in python
      1 #!/usr/bin/env python
      2 #
      3 # USAGE: test_usdt3.py
      4 #
      5 # Copyright 2018 Facebook, Inc
      6 # Licensed under the Apache License, Version 2.0 (the "License")
      7 
      8 from __future__ import print_function
      9 from bcc import BPF, USDT
     10 from unittest import main, TestCase
     11 from subprocess import Popen, PIPE
     12 import ctypes as ct
     13 import inspect, os, tempfile
     14 
     15 class TestUDST(TestCase):
     16     def setUp(self):
     17         common_h = b"""
     18 #include "folly/tracing/StaticTracepoint.h"
     19 
     20 static inline void record_val(int val)
     21 {
     22   FOLLY_SDT(test, probe, val);
     23 }
     24 
     25 extern void record_a(int val);
     26 extern void record_b(int val);
     27 """
     28 
     29         a_c = b"""
     30 #include <stdio.h>
     31 #include "common.h"
     32 
     33 void record_a(int val)
     34 {
     35     record_val(val);
     36 }
     37 """
     38 
     39         b_c = b"""
     40 #include <stdio.h>
     41 #include "common.h"
     42 
     43 void record_b(int val)
     44 {
     45     record_val(val);
     46 }
     47 """
     48 
     49         m_c = b"""
     50 #include <stdio.h>
     51 #include <unistd.h>
     52 #include "common.h"
     53 
     54 int main() {
     55    while (1) {
     56      record_a(1);
     57      record_b(2);
     58      record_val(3);
     59      sleep(1);
     60    }
     61    return 0;
     62 }
     63 """
     64         # BPF program
     65         self.bpf_text = """
     66 BPF_PERF_OUTPUT(event);
     67 int do_trace(struct pt_regs *ctx) {
     68     int result = 0;
     69     bpf_usdt_readarg(1, ctx, &result);
     70     event.perf_submit(ctx, &result, sizeof(result));
     71     return 0;
     72 };
     73 """
     74 
     75         def _create_file(name, text):
     76             text_file = open(name, "wb")
     77             text_file.write(text)
     78             text_file.close()
     79 
     80         # Create source files
     81         self.tmp_dir = tempfile.mkdtemp()
     82         print("temp directory: " + self.tmp_dir)
     83         _create_file(self.tmp_dir + "/common.h", common_h)
     84         _create_file(self.tmp_dir + "/a.c", a_c)
     85         _create_file(self.tmp_dir + "/b.c", b_c)
     86         _create_file(self.tmp_dir + "/m.c", m_c)
     87 
     88         # Compilation
     89         # the usdt test:probe exists in liba.so, libb.so and a.out
     90         include_path = os.path.dirname(os.path.abspath(inspect.getfile(inspect.currentframe()))) + "/include"
     91         a_src = self.tmp_dir + "/a.c"
     92         a_obj = self.tmp_dir + "/a.o"
     93         a_lib = self.tmp_dir + "/liba.so"
     94         b_src = self.tmp_dir + "/b.c"
     95         b_obj = self.tmp_dir + "/b.o"
     96         b_lib = self.tmp_dir + "/libb.so"
     97         m_src = self.tmp_dir + "/m.c"
     98         m_bin = self.tmp_dir + "/a.out"
     99         m_linker_opt = " -L" + self.tmp_dir + " -la -lb"
    100         self.assertEqual(os.system("gcc -I" + include_path + " -fpic -c -o " + a_obj + " " + a_src), 0)
    101         self.assertEqual(os.system("gcc -I" + include_path + " -fpic -c -o " + b_obj + " " + b_src), 0)
    102         self.assertEqual(os.system("gcc -shared -o " + a_lib + " " + a_obj), 0)
    103         self.assertEqual(os.system("gcc -shared -o " + b_lib + " " + b_obj), 0)
    104         self.assertEqual(os.system("gcc -I" + include_path + " " + m_src + " -o " + m_bin + m_linker_opt), 0)
    105 
    106         # Run the application
    107         self.app = Popen([m_bin], env=dict(os.environ, LD_LIBRARY_PATH=self.tmp_dir))
    108         # os.system("tplist.py -vvv -p " + str(self.app.pid))
    109 
    110     def test_attach1(self):
    111         # enable USDT probe from given PID and verifier generated BPF programs
    112         u = USDT(pid=int(self.app.pid))
    113         u.enable_probe(probe="probe", fn_name="do_trace")
    114         b = BPF(text=self.bpf_text, usdt_contexts=[u])
    115 
    116         # processing events
    117         self.probe_value_1 = 0
    118         self.probe_value_2 = 0
    119         self.probe_value_3 = 0
    120         self.probe_value_other = 0
    121 
    122         def print_event(cpu, data, size):
    123             result = ct.cast(data, ct.POINTER(ct.c_int)).contents
    124             if result.value == 1:
    125                 self.probe_value_1 = 1
    126             elif result.value == 2:
    127                 self.probe_value_2 = 1
    128             elif result.value == 3:
    129                 self.probe_value_3 = 1
    130             else:
    131                 self.probe_value_other = 1
    132 
    133         b["event"].open_perf_buffer(print_event)
    134         for i in range(100):
    135             if (self.probe_value_1 == 0 or
    136                 self.probe_value_2 == 0 or
    137                 self.probe_value_3 == 0 or
    138                 self.probe_value_other != 0):
    139                 b.perf_buffer_poll()
    140             else:
    141                 break;
    142 
    143         self.assertTrue(self.probe_value_1 != 0)
    144         self.assertTrue(self.probe_value_2 != 0)
    145         self.assertTrue(self.probe_value_3 != 0)
    146         self.assertTrue(self.probe_value_other == 0)
    147 
    148     def tearDown(self):
    149         # kill the subprocess, clean the environment
    150         self.app.kill()
    151         self.app.wait()
    152         os.system("rm -rf " + self.tmp_dir)
    153 
    154 if __name__ == "__main__":
    155     main()
    156