Home | History | Annotate | Download | only in python
      1 #!/usr/bin/env python
      2 # Copyright (c) PLUMgrid, Inc.
      3 # Licensed under the Apache License, Version 2.0 (the "License")
      4 
      5 from bcc import BPF
      6 import ctypes as ct
      7 import random
      8 import time
      9 import subprocess
     10 from bcc.utils import get_online_cpus
     11 from unittest import main, TestCase
     12 
     13 class TestArray(TestCase):
     14     def test_simple(self):
     15         b = BPF(text="""BPF_ARRAY(table1, u64, 128);""")
     16         t1 = b["table1"]
     17         t1[ct.c_int(0)] = ct.c_ulonglong(100)
     18         t1[ct.c_int(127)] = ct.c_ulonglong(1000)
     19         for i, v in t1.items():
     20             if i.value == 0:
     21                 self.assertEqual(v.value, 100)
     22             if i.value == 127:
     23                 self.assertEqual(v.value, 1000)
     24         self.assertEqual(len(t1), 128)
     25 
     26     def test_native_type(self):
     27         b = BPF(text="""BPF_ARRAY(table1, u64, 128);""")
     28         t1 = b["table1"]
     29         t1[0] = ct.c_ulonglong(100)
     30         t1[-2] = ct.c_ulonglong(37)
     31         t1[127] = ct.c_ulonglong(1000)
     32         for i, v in t1.items():
     33             if i.value == 0:
     34                 self.assertEqual(v.value, 100)
     35             if i.value == 127:
     36                 self.assertEqual(v.value, 1000)
     37         self.assertEqual(len(t1), 128)
     38         self.assertEqual(t1[-2].value, 37)
     39         self.assertEqual(t1[-1].value, t1[127].value)
     40 
     41     def test_perf_buffer(self):
     42         self.counter = 0
     43 
     44         class Data(ct.Structure):
     45             _fields_ = [("ts", ct.c_ulonglong)]
     46 
     47         def cb(cpu, data, size):
     48             self.assertGreater(size, ct.sizeof(Data))
     49             event = ct.cast(data, ct.POINTER(Data)).contents
     50             self.counter += 1
     51 
     52         def lost_cb(lost):
     53             self.assertGreater(lost, 0)
     54 
     55         text = """
     56 BPF_PERF_OUTPUT(events);
     57 int do_sys_nanosleep(void *ctx) {
     58     struct {
     59         u64 ts;
     60     } data = {bpf_ktime_get_ns()};
     61     events.perf_submit(ctx, &data, sizeof(data));
     62     return 0;
     63 }
     64 """
     65         b = BPF(text=text)
     66         b.attach_kprobe(event=b.get_syscall_fnname("nanosleep"),
     67                         fn_name="do_sys_nanosleep")
     68         b["events"].open_perf_buffer(cb, lost_cb=lost_cb)
     69         subprocess.call(['sleep', '0.1'])
     70         b.perf_buffer_poll()
     71         self.assertGreater(self.counter, 0)
     72         b.cleanup()
     73 
     74     def test_perf_buffer_for_each_cpu(self):
     75         self.events = []
     76 
     77         class Data(ct.Structure):
     78             _fields_ = [("cpu", ct.c_ulonglong)]
     79 
     80         def cb(cpu, data, size):
     81             self.assertGreater(size, ct.sizeof(Data))
     82             event = ct.cast(data, ct.POINTER(Data)).contents
     83             self.events.append(event)
     84 
     85         def lost_cb(lost):
     86             self.assertGreater(lost, 0)
     87 
     88         text = """
     89 BPF_PERF_OUTPUT(events);
     90 int do_sys_nanosleep(void *ctx) {
     91     struct {
     92         u64 cpu;
     93     } data = {bpf_get_smp_processor_id()};
     94     events.perf_submit(ctx, &data, sizeof(data));
     95     return 0;
     96 }
     97 """
     98         b = BPF(text=text)
     99         b.attach_kprobe(event=b.get_syscall_fnname("nanosleep"),
    100                         fn_name="do_sys_nanosleep")
    101         b["events"].open_perf_buffer(cb, lost_cb=lost_cb)
    102         online_cpus = get_online_cpus()
    103         for cpu in online_cpus:
    104             subprocess.call(['taskset', '-c', str(cpu), 'sleep', '0.1'])
    105         b.perf_buffer_poll()
    106         b.cleanup()
    107         self.assertGreaterEqual(len(self.events), len(online_cpus), 'Received only {}/{} events'.format(len(self.events), len(online_cpus)))
    108 
    109 if __name__ == "__main__":
    110     main()
    111