1 #!/usr/bin/env python 2 # Copyright (c) PLUMgrid, Inc. 3 # Licensed under the Apache License, Version 2.0 (the "License") 4 5 from bcc import BPF 6 import ctypes as ct 7 import random 8 import time 9 import subprocess 10 from bcc.utils import get_online_cpus 11 from unittest import main, TestCase 12 13 class TestArray(TestCase): 14 def test_simple(self): 15 b = BPF(text="""BPF_ARRAY(table1, u64, 128);""") 16 t1 = b["table1"] 17 t1[ct.c_int(0)] = ct.c_ulonglong(100) 18 t1[ct.c_int(127)] = ct.c_ulonglong(1000) 19 for i, v in t1.items(): 20 if i.value == 0: 21 self.assertEqual(v.value, 100) 22 if i.value == 127: 23 self.assertEqual(v.value, 1000) 24 self.assertEqual(len(t1), 128) 25 26 def test_native_type(self): 27 b = BPF(text="""BPF_ARRAY(table1, u64, 128);""") 28 t1 = b["table1"] 29 t1[0] = ct.c_ulonglong(100) 30 t1[-2] = ct.c_ulonglong(37) 31 t1[127] = ct.c_ulonglong(1000) 32 for i, v in t1.items(): 33 if i.value == 0: 34 self.assertEqual(v.value, 100) 35 if i.value == 127: 36 self.assertEqual(v.value, 1000) 37 self.assertEqual(len(t1), 128) 38 self.assertEqual(t1[-2].value, 37) 39 self.assertEqual(t1[-1].value, t1[127].value) 40 41 def test_perf_buffer(self): 42 self.counter = 0 43 44 class Data(ct.Structure): 45 _fields_ = [("ts", ct.c_ulonglong)] 46 47 def cb(cpu, data, size): 48 self.assertGreater(size, ct.sizeof(Data)) 49 event = ct.cast(data, ct.POINTER(Data)).contents 50 self.counter += 1 51 52 def lost_cb(lost): 53 self.assertGreater(lost, 0) 54 55 text = """ 56 BPF_PERF_OUTPUT(events); 57 int do_sys_nanosleep(void *ctx) { 58 struct { 59 u64 ts; 60 } data = {bpf_ktime_get_ns()}; 61 events.perf_submit(ctx, &data, sizeof(data)); 62 return 0; 63 } 64 """ 65 b = BPF(text=text) 66 b.attach_kprobe(event=b.get_syscall_fnname("nanosleep"), 67 fn_name="do_sys_nanosleep") 68 b["events"].open_perf_buffer(cb, lost_cb=lost_cb) 69 subprocess.call(['sleep', '0.1']) 70 b.perf_buffer_poll() 71 self.assertGreater(self.counter, 0) 72 b.cleanup() 73 74 def test_perf_buffer_for_each_cpu(self): 75 self.events = [] 76 77 class Data(ct.Structure): 78 _fields_ = [("cpu", ct.c_ulonglong)] 79 80 def cb(cpu, data, size): 81 self.assertGreater(size, ct.sizeof(Data)) 82 event = ct.cast(data, ct.POINTER(Data)).contents 83 self.events.append(event) 84 85 def lost_cb(lost): 86 self.assertGreater(lost, 0) 87 88 text = """ 89 BPF_PERF_OUTPUT(events); 90 int do_sys_nanosleep(void *ctx) { 91 struct { 92 u64 cpu; 93 } data = {bpf_get_smp_processor_id()}; 94 events.perf_submit(ctx, &data, sizeof(data)); 95 return 0; 96 } 97 """ 98 b = BPF(text=text) 99 b.attach_kprobe(event=b.get_syscall_fnname("nanosleep"), 100 fn_name="do_sys_nanosleep") 101 b["events"].open_perf_buffer(cb, lost_cb=lost_cb) 102 online_cpus = get_online_cpus() 103 for cpu in online_cpus: 104 subprocess.call(['taskset', '-c', str(cpu), 'sleep', '0.1']) 105 b.perf_buffer_poll() 106 b.cleanup() 107 self.assertGreaterEqual(len(self.events), len(online_cpus), 'Received only {}/{} events'.format(len(self.events), len(online_cpus))) 108 109 if __name__ == "__main__": 110 main() 111