Home | History | Annotate | Download | only in python
      1 #!/usr/bin/env python
      2 # Copyright (c) PLUMgrid, Inc.
      3 # Licensed under the Apache License, Version 2.0 (the "License")
      4 
      5 from bcc import BPF
      6 import os
      7 import sys
      8 from unittest import main, TestCase
      9 
     10 class TestKprobeRgx(TestCase):
     11     def setUp(self):
     12         self.b = BPF(text=b"""
     13         typedef struct { int idx; } Key;
     14         typedef struct { u64 val; } Val;
     15         BPF_HASH(stats, Key, Val, 3);
     16         int hello(void *ctx) {
     17           stats.lookup_or_init(&(Key){1}, &(Val){0})->val++;
     18           return 0;
     19         }
     20         int goodbye(void *ctx) {
     21           stats.lookup_or_init(&(Key){2}, &(Val){0})->val++;
     22           return 0;
     23         }
     24         """)
     25         self.b.attach_kprobe(event_re=b"^" + self.b.get_syscall_prefix() + b"bp.*",
     26                              fn_name=b"hello")
     27         self.b.attach_kretprobe(event_re=b"^" + self.b.get_syscall_prefix() + b"bp.*",
     28                                 fn_name=b"goodbye")
     29 
     30     def test_send1(self):
     31         k1 = self.b[b"stats"].Key(1)
     32         k2 = self.b[b"stats"].Key(2)
     33         self.assertTrue(self.b[b"stats"][k1].val >= 2)
     34         self.assertTrue(self.b[b"stats"][k2].val == 1)
     35 
     36 class TestKprobeReplace(TestCase):
     37     def setUp(self):
     38         self.b = BPF(text=b"int empty(void *ctx) { return 0; }")
     39 
     40     def test_periods(self):
     41         self.b.attach_kprobe(event_re=b"^tcp_enter_cwr.*", fn_name=b"empty")
     42 
     43 if __name__ == "__main__":
     44     main()
     45