Home | History | Annotate | Download | only in tools
      1 #!/usr/bin/env python3
      2 # -*- coding: utf-8 -*-
      3 #
      4 # Copyright (C) 2018 The Android Open Source Project
      5 #
      6 # Licensed under the Apache License, Version 2.0 (the "License");
      7 # you may not use this file except in compliance with the License.
      8 # You may obtain a copy of the License at
      9 #
     10 #      http://www.apache.org/licenses/LICENSE-2.0
     11 #
     12 # Unless required by applicable law or agreed to in writing, software
     13 # distributed under the License is distributed on an "AS IS" BASIS,
     14 # WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
     15 # See the License for the specific language governing permissions and
     16 # limitations under the License.
     17 """Unittests for the compiler module."""
     18 
     19 from __future__ import print_function
     20 
     21 import os
     22 import random
     23 import shutil
     24 import tempfile
     25 import unittest
     26 
     27 import arch
     28 import bpf
     29 import compiler
     30 import parser  # pylint: disable=wrong-import-order
     31 
     32 ARCH_64 = arch.Arch.load_from_json(
     33     os.path.join(
     34         os.path.dirname(os.path.abspath(__file__)), 'testdata/arch_64.json'))
     35 
     36 
     37 class CompileFilterStatementTests(unittest.TestCase):
     38     """Tests for PolicyCompiler.compile_filter_statement."""
     39 
     40     def setUp(self):
     41         self.arch = ARCH_64
     42         self.compiler = compiler.PolicyCompiler(self.arch)
     43 
     44     def _compile(self, line):
     45         with tempfile.NamedTemporaryFile(mode='w') as policy_file:
     46             policy_file.write(line)
     47             policy_file.flush()
     48             policy_parser = parser.PolicyParser(
     49                 self.arch, kill_action=bpf.KillProcess())
     50             parsed_policy = policy_parser.parse_file(policy_file.name)
     51             assert len(parsed_policy.filter_statements) == 1
     52             return self.compiler.compile_filter_statement(
     53                 parsed_policy.filter_statements[0],
     54                 kill_action=bpf.KillProcess())
     55 
     56     def test_allow(self):
     57         """Accept lines where the syscall is accepted unconditionally."""
     58         block = self._compile('read: allow')
     59         self.assertEqual(block.filter, None)
     60         self.assertEqual(
     61             block.simulate(self.arch.arch_nr, self.arch.syscalls['read'],
     62                            0)[1], 'ALLOW')
     63         self.assertEqual(
     64             block.simulate(self.arch.arch_nr, self.arch.syscalls['read'],
     65                            1)[1], 'ALLOW')
     66 
     67     def test_arg0_eq_generated_code(self):
     68         """Accept lines with an argument filter with ==."""
     69         block = self._compile('read: arg0 == 0x100')
     70         # It might be a bit brittle to check the generated code in each test
     71         # case instead of just the behavior, but there should be at least one
     72         # test where this happens.
     73         self.assertEqual(
     74             block.filter.instructions,
     75             [
     76                 bpf.SockFilter(bpf.BPF_LD | bpf.BPF_W | bpf.BPF_ABS, 0, 0,
     77                                bpf.arg_offset(0, True)),
     78                 # Jump to KILL_PROCESS if the high word does not match.
     79                 bpf.SockFilter(bpf.BPF_JMP | bpf.BPF_JEQ | bpf.BPF_K, 0, 2, 0),
     80                 bpf.SockFilter(bpf.BPF_LD | bpf.BPF_W | bpf.BPF_ABS, 0, 0,
     81                                bpf.arg_offset(0, False)),
     82                 # Jump to KILL_PROCESS if the low word does not match.
     83                 bpf.SockFilter(bpf.BPF_JMP | bpf.BPF_JEQ | bpf.BPF_K, 1, 0,
     84                                0x100),
     85                 bpf.SockFilter(bpf.BPF_RET, 0, 0,
     86                                bpf.SECCOMP_RET_KILL_PROCESS),
     87                 bpf.SockFilter(bpf.BPF_RET, 0, 0, bpf.SECCOMP_RET_ALLOW),
     88             ])
     89 
     90     def test_arg0_comparison_operators(self):
     91         """Accept lines with an argument filter with comparison operators."""
     92         biases = (-1, 0, 1)
     93         # For each operator, store the expectations of simulating the program
     94         # against the constant plus each entry from the |biases| array.
     95         cases = (
     96             ('==', ('KILL_PROCESS', 'ALLOW', 'KILL_PROCESS')),
     97             ('!=', ('ALLOW', 'KILL_PROCESS', 'ALLOW')),
     98             ('<', ('ALLOW', 'KILL_PROCESS', 'KILL_PROCESS')),
     99             ('<=', ('ALLOW', 'ALLOW', 'KILL_PROCESS')),
    100             ('>', ('KILL_PROCESS', 'KILL_PROCESS', 'ALLOW')),
    101             ('>=', ('KILL_PROCESS', 'ALLOW', 'ALLOW')),
    102         )
    103         for operator, expectations in cases:
    104             block = self._compile('read: arg0 %s 0x100' % operator)
    105 
    106             # Check the filter's behavior.
    107             for bias, expectation in zip(biases, expectations):
    108                 self.assertEqual(
    109                     block.simulate(self.arch.arch_nr,
    110                                    self.arch.syscalls['read'],
    111                                    0x100 + bias)[1], expectation)
    112 
    113     def test_arg0_mask_operator(self):
    114         """Accept lines with an argument filter with &."""
    115         block = self._compile('read: arg0 & 0x3')
    116 
    117         self.assertEqual(
    118             block.simulate(self.arch.arch_nr, self.arch.syscalls['read'],
    119                            0)[1], 'KILL_PROCESS')
    120         self.assertEqual(
    121             block.simulate(self.arch.arch_nr, self.arch.syscalls['read'],
    122                            1)[1], 'ALLOW')
    123         self.assertEqual(
    124             block.simulate(self.arch.arch_nr, self.arch.syscalls['read'],
    125                            2)[1], 'ALLOW')
    126         self.assertEqual(
    127             block.simulate(self.arch.arch_nr, self.arch.syscalls['read'],
    128                            3)[1], 'ALLOW')
    129         self.assertEqual(
    130             block.simulate(self.arch.arch_nr, self.arch.syscalls['read'],
    131                            4)[1], 'KILL_PROCESS')
    132         self.assertEqual(
    133             block.simulate(self.arch.arch_nr, self.arch.syscalls['read'],
    134                            5)[1], 'ALLOW')
    135         self.assertEqual(
    136             block.simulate(self.arch.arch_nr, self.arch.syscalls['read'],
    137                            6)[1], 'ALLOW')
    138         self.assertEqual(
    139             block.simulate(self.arch.arch_nr, self.arch.syscalls['read'],
    140                            7)[1], 'ALLOW')
    141         self.assertEqual(
    142             block.simulate(self.arch.arch_nr, self.arch.syscalls['read'],
    143                            8)[1], 'KILL_PROCESS')
    144 
    145     def test_arg0_in_operator(self):
    146         """Accept lines with an argument filter with in."""
    147         block = self._compile('read: arg0 in 0x3')
    148 
    149         # The 'in' operator only ensures that no bits outside the mask are set,
    150         # which means that 0 is always allowed.
    151         self.assertEqual(
    152             block.simulate(self.arch.arch_nr, self.arch.syscalls['read'],
    153                            0)[1], 'ALLOW')
    154         self.assertEqual(
    155             block.simulate(self.arch.arch_nr, self.arch.syscalls['read'],
    156                            1)[1], 'ALLOW')
    157         self.assertEqual(
    158             block.simulate(self.arch.arch_nr, self.arch.syscalls['read'],
    159                            2)[1], 'ALLOW')
    160         self.assertEqual(
    161             block.simulate(self.arch.arch_nr, self.arch.syscalls['read'],
    162                            3)[1], 'ALLOW')
    163         self.assertEqual(
    164             block.simulate(self.arch.arch_nr, self.arch.syscalls['read'],
    165                            4)[1], 'KILL_PROCESS')
    166         self.assertEqual(
    167             block.simulate(self.arch.arch_nr, self.arch.syscalls['read'],
    168                            5)[1], 'KILL_PROCESS')
    169         self.assertEqual(
    170             block.simulate(self.arch.arch_nr, self.arch.syscalls['read'],
    171                            6)[1], 'KILL_PROCESS')
    172         self.assertEqual(
    173             block.simulate(self.arch.arch_nr, self.arch.syscalls['read'],
    174                            7)[1], 'KILL_PROCESS')
    175         self.assertEqual(
    176             block.simulate(self.arch.arch_nr, self.arch.syscalls['read'],
    177                            8)[1], 'KILL_PROCESS')
    178 
    179     def test_arg0_short_gt_ge_comparisons(self):
    180         """Ensure that the short comparison optimization kicks in."""
    181         if self.arch.bits == 32:
    182             return
    183         short_constant_str = '0xdeadbeef'
    184         short_constant = int(short_constant_str, base=0)
    185         long_constant_str = '0xbadc0ffee0ddf00d'
    186         long_constant = int(long_constant_str, base=0)
    187         biases = (-1, 0, 1)
    188         # For each operator, store the expectations of simulating the program
    189         # against the constant plus each entry from the |biases| array.
    190         cases = (
    191             ('<', ('ALLOW', 'KILL_PROCESS', 'KILL_PROCESS')),
    192             ('<=', ('ALLOW', 'ALLOW', 'KILL_PROCESS')),
    193             ('>', ('KILL_PROCESS', 'KILL_PROCESS', 'ALLOW')),
    194             ('>=', ('KILL_PROCESS', 'ALLOW', 'ALLOW')),
    195         )
    196         for operator, expectations in cases:
    197             short_block = self._compile(
    198                 'read: arg0 %s %s' % (operator, short_constant_str))
    199             long_block = self._compile(
    200                 'read: arg0 %s %s' % (operator, long_constant_str))
    201 
    202             # Check that the emitted code is shorter when the high word of the
    203             # constant is zero.
    204             self.assertLess(
    205                 len(short_block.filter.instructions),
    206                 len(long_block.filter.instructions))
    207 
    208             # Check the filter's behavior.
    209             for bias, expectation in zip(biases, expectations):
    210                 self.assertEqual(
    211                     long_block.simulate(self.arch.arch_nr,
    212                                         self.arch.syscalls['read'],
    213                                         long_constant + bias)[1], expectation)
    214                 self.assertEqual(
    215                     short_block.simulate(
    216                         self.arch.arch_nr, self.arch.syscalls['read'],
    217                         short_constant + bias)[1], expectation)
    218 
    219     def test_and_or(self):
    220         """Accept lines with a complex expression in DNF."""
    221         block = self._compile('read: arg0 == 0 && arg1 == 0 || arg0 == 1')
    222 
    223         self.assertEqual(
    224             block.simulate(self.arch.arch_nr, self.arch.syscalls['read'], 0,
    225                            0)[1], 'ALLOW')
    226         self.assertEqual(
    227             block.simulate(self.arch.arch_nr, self.arch.syscalls['read'], 0,
    228                            1)[1], 'KILL_PROCESS')
    229         self.assertEqual(
    230             block.simulate(self.arch.arch_nr, self.arch.syscalls['read'], 1,
    231                            0)[1], 'ALLOW')
    232         self.assertEqual(
    233             block.simulate(self.arch.arch_nr, self.arch.syscalls['read'], 1,
    234                            1)[1], 'ALLOW')
    235 
    236     def test_trap(self):
    237         """Accept lines that trap unconditionally."""
    238         block = self._compile('read: trap')
    239 
    240         self.assertEqual(
    241             block.simulate(self.arch.arch_nr, self.arch.syscalls['read'],
    242                            0)[1], 'TRAP')
    243 
    244     def test_ret_errno(self):
    245         """Accept lines that return errno."""
    246         block = self._compile('read : arg0 == 0 || arg0 == 1 ; return 1')
    247 
    248         self.assertEqual(
    249             block.simulate(self.arch.arch_nr, self.arch.syscalls['read'],
    250                            0)[1:], ('ERRNO', 1))
    251         self.assertEqual(
    252             block.simulate(self.arch.arch_nr, self.arch.syscalls['read'],
    253                            1)[1:], ('ERRNO', 1))
    254         self.assertEqual(
    255             block.simulate(self.arch.arch_nr, self.arch.syscalls['read'],
    256                            2)[1], 'KILL_PROCESS')
    257 
    258     def test_ret_errno_unconditionally(self):
    259         """Accept lines that return errno unconditionally."""
    260         block = self._compile('read: return 1')
    261 
    262         self.assertEqual(
    263             block.simulate(self.arch.arch_nr, self.arch.syscalls['read'],
    264                            0)[1:], ('ERRNO', 1))
    265 
    266     def test_trace(self):
    267         """Accept lines that trace unconditionally."""
    268         block = self._compile('read: trace')
    269 
    270         self.assertEqual(
    271             block.simulate(self.arch.arch_nr, self.arch.syscalls['read'],
    272                            0)[1], 'TRACE')
    273 
    274     def test_log(self):
    275         """Accept lines that log unconditionally."""
    276         block = self._compile('read: log')
    277 
    278         self.assertEqual(
    279             block.simulate(self.arch.arch_nr, self.arch.syscalls['read'],
    280                            0)[1], 'LOG')
    281 
    282     def test_mmap_write_xor_exec(self):
    283         """Accept the idiomatic filter for mmap."""
    284         block = self._compile(
    285             'read : arg0 in ~PROT_WRITE || arg0 in ~PROT_EXEC')
    286 
    287         prot_exec_and_write = 6
    288         for prot in range(0, 0xf):
    289             if (prot & prot_exec_and_write) == prot_exec_and_write:
    290                 self.assertEqual(
    291                     block.simulate(self.arch.arch_nr,
    292                                    self.arch.syscalls['read'], prot)[1],
    293                     'KILL_PROCESS')
    294             else:
    295                 self.assertEqual(
    296                     block.simulate(self.arch.arch_nr,
    297                                    self.arch.syscalls['read'], prot)[1],
    298                     'ALLOW')
    299 
    300 
    301 class CompileFileTests(unittest.TestCase):
    302     """Tests for PolicyCompiler.compile_file."""
    303 
    304     def setUp(self):
    305         self.arch = ARCH_64
    306         self.compiler = compiler.PolicyCompiler(self.arch)
    307         self.tempdir = tempfile.mkdtemp()
    308 
    309     def tearDown(self):
    310         shutil.rmtree(self.tempdir)
    311 
    312     def _write_file(self, filename, contents):
    313         """Helper to write out a file for testing."""
    314         path = os.path.join(self.tempdir, filename)
    315         with open(path, 'w') as outf:
    316             outf.write(contents)
    317         return path
    318 
    319     def test_compile(self):
    320         """Ensure compilation works with all strategies."""
    321         self._write_file(
    322             'test.frequency', """
    323             read: 1
    324             close: 10
    325         """)
    326         path = self._write_file(
    327             'test.policy', """
    328             @frequency ./test.frequency
    329             read: 1
    330             close: 1
    331         """)
    332 
    333         program = self.compiler.compile_file(
    334             path,
    335             optimization_strategy=compiler.OptimizationStrategy.LINEAR,
    336             kill_action=bpf.KillProcess())
    337         self.assertGreater(
    338             bpf.simulate(program.instructions, self.arch.arch_nr,
    339                          self.arch.syscalls['read'], 0)[0],
    340             bpf.simulate(program.instructions, self.arch.arch_nr,
    341                          self.arch.syscalls['close'], 0)[0],
    342         )
    343 
    344     def test_compile_bst(self):
    345         """Ensure compilation with BST is cheaper than the linear model."""
    346         self._write_file(
    347             'test.frequency', """
    348             read: 1
    349             close: 10
    350         """)
    351         path = self._write_file(
    352             'test.policy', """
    353             @frequency ./test.frequency
    354             read: 1
    355             close: 1
    356         """)
    357 
    358         for strategy in list(compiler.OptimizationStrategy):
    359             program = self.compiler.compile_file(
    360                 path,
    361                 optimization_strategy=strategy,
    362                 kill_action=bpf.KillProcess())
    363             self.assertGreater(
    364                 bpf.simulate(program.instructions, self.arch.arch_nr,
    365                              self.arch.syscalls['read'], 0)[0],
    366                 bpf.simulate(program.instructions, self.arch.arch_nr,
    367                              self.arch.syscalls['close'], 0)[0],
    368             )
    369             self.assertEqual(
    370                 bpf.simulate(program.instructions, self.arch.arch_nr,
    371                              self.arch.syscalls['read'], 0)[1], 'ALLOW')
    372             self.assertEqual(
    373                 bpf.simulate(program.instructions, self.arch.arch_nr,
    374                              self.arch.syscalls['close'], 0)[1], 'ALLOW')
    375 
    376     def test_compile_empty_file(self):
    377         """Accept empty files."""
    378         path = self._write_file(
    379             'test.policy', """
    380             @default kill-thread
    381         """)
    382 
    383         for strategy in list(compiler.OptimizationStrategy):
    384             program = self.compiler.compile_file(
    385                 path,
    386                 optimization_strategy=strategy,
    387                 kill_action=bpf.KillProcess())
    388             self.assertEqual(
    389                 bpf.simulate(program.instructions, self.arch.arch_nr,
    390                              self.arch.syscalls['read'], 0)[1], 'KILL_THREAD')
    391 
    392     def test_compile_simulate(self):
    393         """Ensure policy reflects script by testing some random scripts."""
    394         iterations = 5
    395         for i in range(iterations):
    396             num_entries = 64 * (i + 1) // iterations
    397             syscalls = dict(
    398                 zip(
    399                     random.sample(self.arch.syscalls.keys(), num_entries),
    400                     (random.randint(1, 1024) for _ in range(num_entries)),
    401                 ))
    402 
    403             frequency_contents = '\n'.join(
    404                 '%s: %d' % s for s in syscalls.items())
    405             policy_contents = '@frequency ./test.frequency\n' + '\n'.join(
    406                 '%s: 1' % s[0] for s in syscalls.items())
    407 
    408             self._write_file('test.frequency', frequency_contents)
    409             path = self._write_file('test.policy', policy_contents)
    410 
    411             for strategy in list(compiler.OptimizationStrategy):
    412                 program = self.compiler.compile_file(
    413                     path,
    414                     optimization_strategy=strategy,
    415                     kill_action=bpf.KillProcess())
    416                 for name, number in self.arch.syscalls.items():
    417                     expected_result = ('ALLOW'
    418                                        if name in syscalls else 'KILL_PROCESS')
    419                     self.assertEqual(
    420                         bpf.simulate(program.instructions, self.arch.arch_nr,
    421                                      number, 0)[1], expected_result,
    422                         ('syscall name: %s, syscall number: %d, '
    423                          'strategy: %s, policy:\n%s') %
    424                         (name, number, strategy, policy_contents))
    425 
    426     @unittest.skipIf(not int(os.getenv('SLOW_TESTS', '0')), 'slow')
    427     def test_compile_huge_policy(self):
    428         """Ensure jumps while compiling a huge policy are still valid."""
    429         # Given that the BST strategy is O(n^3), don't choose a crazy large
    430         # value, but it still needs to be around 128 so that we exercise the
    431         # codegen paths that depend on the length of the jump.
    432         #
    433         # Immediate jump offsets in BPF comparison instructions are limited to
    434         # 256 instructions, so given that every syscall filter consists of a
    435         # load and jump instructions, with 128 syscalls there will be at least
    436         # one jump that's further than 256 instructions.
    437         num_entries = 128
    438         syscalls = dict(random.sample(self.arch.syscalls.items(), num_entries))
    439         # Here we force every single filter to be distinct. Otherwise the
    440         # codegen layer will coalesce filters that compile to the same
    441         # instructions.
    442         policy_contents = '\n'.join(
    443             '%s: arg0 == %d' % s for s in syscalls.items())
    444 
    445         path = self._write_file('test.policy', policy_contents)
    446 
    447         program = self.compiler.compile_file(
    448             path,
    449             optimization_strategy=compiler.OptimizationStrategy.BST,
    450             kill_action=bpf.KillProcess())
    451         for name, number in self.arch.syscalls.items():
    452             expected_result = ('ALLOW'
    453                                if name in syscalls else 'KILL_PROCESS')
    454             self.assertEqual(
    455                 bpf.simulate(program.instructions, self.arch.arch_nr,
    456                              self.arch.syscalls[name], number)[1],
    457                 expected_result)
    458             self.assertEqual(
    459                 bpf.simulate(program.instructions, self.arch.arch_nr,
    460                              self.arch.syscalls[name], number + 1)[1],
    461                 'KILL_PROCESS')
    462 
    463     def test_compile_huge_filter(self):
    464         """Ensure jumps while compiling a huge policy are still valid."""
    465         # This is intended to force cases where the AST visitation would result
    466         # in a combinatorial explosion of calls to Block.accept(). An optimized
    467         # implementation should be O(n).
    468         num_entries = 128
    469         syscalls = {}
    470         # Here we force every single filter to be distinct. Otherwise the
    471         # codegen layer will coalesce filters that compile to the same
    472         # instructions.
    473         policy_contents = []
    474         for name in random.sample(self.arch.syscalls.keys(), num_entries):
    475             values = random.sample(range(1024), num_entries)
    476             syscalls[name] = values
    477             policy_contents.append(
    478                 '%s: %s' % (name, ' || '.join('arg0 == %d' % value
    479                                               for value in values)))
    480 
    481         path = self._write_file('test.policy', '\n'.join(policy_contents))
    482 
    483         program = self.compiler.compile_file(
    484             path,
    485             optimization_strategy=compiler.OptimizationStrategy.LINEAR,
    486             kill_action=bpf.KillProcess())
    487         for name, values in syscalls.items():
    488             self.assertEqual(
    489                 bpf.simulate(program.instructions,
    490                              self.arch.arch_nr, self.arch.syscalls[name],
    491                              random.choice(values))[1], 'ALLOW')
    492             self.assertEqual(
    493                 bpf.simulate(program.instructions, self.arch.arch_nr,
    494                              self.arch.syscalls[name], 1025)[1],
    495                 'KILL_PROCESS')
    496 
    497 
    498 if __name__ == '__main__':
    499     unittest.main()
    500