1 #!/usr/bin/env python3 2 # -*- coding: utf-8 -*- 3 # 4 # Copyright (C) 2018 The Android Open Source Project 5 # 6 # Licensed under the Apache License, Version 2.0 (the "License"); 7 # you may not use this file except in compliance with the License. 8 # You may obtain a copy of the License at 9 # 10 # http://www.apache.org/licenses/LICENSE-2.0 11 # 12 # Unless required by applicable law or agreed to in writing, software 13 # distributed under the License is distributed on an "AS IS" BASIS, 14 # WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. 15 # See the License for the specific language governing permissions and 16 # limitations under the License. 17 """Unittests for the compiler module.""" 18 19 from __future__ import print_function 20 21 import os 22 import random 23 import shutil 24 import tempfile 25 import unittest 26 27 import arch 28 import bpf 29 import compiler 30 import parser # pylint: disable=wrong-import-order 31 32 ARCH_64 = arch.Arch.load_from_json( 33 os.path.join( 34 os.path.dirname(os.path.abspath(__file__)), 'testdata/arch_64.json')) 35 36 37 class CompileFilterStatementTests(unittest.TestCase): 38 """Tests for PolicyCompiler.compile_filter_statement.""" 39 40 def setUp(self): 41 self.arch = ARCH_64 42 self.compiler = compiler.PolicyCompiler(self.arch) 43 44 def _compile(self, line): 45 with tempfile.NamedTemporaryFile(mode='w') as policy_file: 46 policy_file.write(line) 47 policy_file.flush() 48 policy_parser = parser.PolicyParser( 49 self.arch, kill_action=bpf.KillProcess()) 50 parsed_policy = policy_parser.parse_file(policy_file.name) 51 assert len(parsed_policy.filter_statements) == 1 52 return self.compiler.compile_filter_statement( 53 parsed_policy.filter_statements[0], 54 kill_action=bpf.KillProcess()) 55 56 def test_allow(self): 57 """Accept lines where the syscall is accepted unconditionally.""" 58 block = self._compile('read: allow') 59 self.assertEqual(block.filter, None) 60 self.assertEqual( 61 block.simulate(self.arch.arch_nr, self.arch.syscalls['read'], 62 0)[1], 'ALLOW') 63 self.assertEqual( 64 block.simulate(self.arch.arch_nr, self.arch.syscalls['read'], 65 1)[1], 'ALLOW') 66 67 def test_arg0_eq_generated_code(self): 68 """Accept lines with an argument filter with ==.""" 69 block = self._compile('read: arg0 == 0x100') 70 # It might be a bit brittle to check the generated code in each test 71 # case instead of just the behavior, but there should be at least one 72 # test where this happens. 73 self.assertEqual( 74 block.filter.instructions, 75 [ 76 bpf.SockFilter(bpf.BPF_LD | bpf.BPF_W | bpf.BPF_ABS, 0, 0, 77 bpf.arg_offset(0, True)), 78 # Jump to KILL_PROCESS if the high word does not match. 79 bpf.SockFilter(bpf.BPF_JMP | bpf.BPF_JEQ | bpf.BPF_K, 0, 2, 0), 80 bpf.SockFilter(bpf.BPF_LD | bpf.BPF_W | bpf.BPF_ABS, 0, 0, 81 bpf.arg_offset(0, False)), 82 # Jump to KILL_PROCESS if the low word does not match. 83 bpf.SockFilter(bpf.BPF_JMP | bpf.BPF_JEQ | bpf.BPF_K, 1, 0, 84 0x100), 85 bpf.SockFilter(bpf.BPF_RET, 0, 0, 86 bpf.SECCOMP_RET_KILL_PROCESS), 87 bpf.SockFilter(bpf.BPF_RET, 0, 0, bpf.SECCOMP_RET_ALLOW), 88 ]) 89 90 def test_arg0_comparison_operators(self): 91 """Accept lines with an argument filter with comparison operators.""" 92 biases = (-1, 0, 1) 93 # For each operator, store the expectations of simulating the program 94 # against the constant plus each entry from the |biases| array. 95 cases = ( 96 ('==', ('KILL_PROCESS', 'ALLOW', 'KILL_PROCESS')), 97 ('!=', ('ALLOW', 'KILL_PROCESS', 'ALLOW')), 98 ('<', ('ALLOW', 'KILL_PROCESS', 'KILL_PROCESS')), 99 ('<=', ('ALLOW', 'ALLOW', 'KILL_PROCESS')), 100 ('>', ('KILL_PROCESS', 'KILL_PROCESS', 'ALLOW')), 101 ('>=', ('KILL_PROCESS', 'ALLOW', 'ALLOW')), 102 ) 103 for operator, expectations in cases: 104 block = self._compile('read: arg0 %s 0x100' % operator) 105 106 # Check the filter's behavior. 107 for bias, expectation in zip(biases, expectations): 108 self.assertEqual( 109 block.simulate(self.arch.arch_nr, 110 self.arch.syscalls['read'], 111 0x100 + bias)[1], expectation) 112 113 def test_arg0_mask_operator(self): 114 """Accept lines with an argument filter with &.""" 115 block = self._compile('read: arg0 & 0x3') 116 117 self.assertEqual( 118 block.simulate(self.arch.arch_nr, self.arch.syscalls['read'], 119 0)[1], 'KILL_PROCESS') 120 self.assertEqual( 121 block.simulate(self.arch.arch_nr, self.arch.syscalls['read'], 122 1)[1], 'ALLOW') 123 self.assertEqual( 124 block.simulate(self.arch.arch_nr, self.arch.syscalls['read'], 125 2)[1], 'ALLOW') 126 self.assertEqual( 127 block.simulate(self.arch.arch_nr, self.arch.syscalls['read'], 128 3)[1], 'ALLOW') 129 self.assertEqual( 130 block.simulate(self.arch.arch_nr, self.arch.syscalls['read'], 131 4)[1], 'KILL_PROCESS') 132 self.assertEqual( 133 block.simulate(self.arch.arch_nr, self.arch.syscalls['read'], 134 5)[1], 'ALLOW') 135 self.assertEqual( 136 block.simulate(self.arch.arch_nr, self.arch.syscalls['read'], 137 6)[1], 'ALLOW') 138 self.assertEqual( 139 block.simulate(self.arch.arch_nr, self.arch.syscalls['read'], 140 7)[1], 'ALLOW') 141 self.assertEqual( 142 block.simulate(self.arch.arch_nr, self.arch.syscalls['read'], 143 8)[1], 'KILL_PROCESS') 144 145 def test_arg0_in_operator(self): 146 """Accept lines with an argument filter with in.""" 147 block = self._compile('read: arg0 in 0x3') 148 149 # The 'in' operator only ensures that no bits outside the mask are set, 150 # which means that 0 is always allowed. 151 self.assertEqual( 152 block.simulate(self.arch.arch_nr, self.arch.syscalls['read'], 153 0)[1], 'ALLOW') 154 self.assertEqual( 155 block.simulate(self.arch.arch_nr, self.arch.syscalls['read'], 156 1)[1], 'ALLOW') 157 self.assertEqual( 158 block.simulate(self.arch.arch_nr, self.arch.syscalls['read'], 159 2)[1], 'ALLOW') 160 self.assertEqual( 161 block.simulate(self.arch.arch_nr, self.arch.syscalls['read'], 162 3)[1], 'ALLOW') 163 self.assertEqual( 164 block.simulate(self.arch.arch_nr, self.arch.syscalls['read'], 165 4)[1], 'KILL_PROCESS') 166 self.assertEqual( 167 block.simulate(self.arch.arch_nr, self.arch.syscalls['read'], 168 5)[1], 'KILL_PROCESS') 169 self.assertEqual( 170 block.simulate(self.arch.arch_nr, self.arch.syscalls['read'], 171 6)[1], 'KILL_PROCESS') 172 self.assertEqual( 173 block.simulate(self.arch.arch_nr, self.arch.syscalls['read'], 174 7)[1], 'KILL_PROCESS') 175 self.assertEqual( 176 block.simulate(self.arch.arch_nr, self.arch.syscalls['read'], 177 8)[1], 'KILL_PROCESS') 178 179 def test_arg0_short_gt_ge_comparisons(self): 180 """Ensure that the short comparison optimization kicks in.""" 181 if self.arch.bits == 32: 182 return 183 short_constant_str = '0xdeadbeef' 184 short_constant = int(short_constant_str, base=0) 185 long_constant_str = '0xbadc0ffee0ddf00d' 186 long_constant = int(long_constant_str, base=0) 187 biases = (-1, 0, 1) 188 # For each operator, store the expectations of simulating the program 189 # against the constant plus each entry from the |biases| array. 190 cases = ( 191 ('<', ('ALLOW', 'KILL_PROCESS', 'KILL_PROCESS')), 192 ('<=', ('ALLOW', 'ALLOW', 'KILL_PROCESS')), 193 ('>', ('KILL_PROCESS', 'KILL_PROCESS', 'ALLOW')), 194 ('>=', ('KILL_PROCESS', 'ALLOW', 'ALLOW')), 195 ) 196 for operator, expectations in cases: 197 short_block = self._compile( 198 'read: arg0 %s %s' % (operator, short_constant_str)) 199 long_block = self._compile( 200 'read: arg0 %s %s' % (operator, long_constant_str)) 201 202 # Check that the emitted code is shorter when the high word of the 203 # constant is zero. 204 self.assertLess( 205 len(short_block.filter.instructions), 206 len(long_block.filter.instructions)) 207 208 # Check the filter's behavior. 209 for bias, expectation in zip(biases, expectations): 210 self.assertEqual( 211 long_block.simulate(self.arch.arch_nr, 212 self.arch.syscalls['read'], 213 long_constant + bias)[1], expectation) 214 self.assertEqual( 215 short_block.simulate( 216 self.arch.arch_nr, self.arch.syscalls['read'], 217 short_constant + bias)[1], expectation) 218 219 def test_and_or(self): 220 """Accept lines with a complex expression in DNF.""" 221 block = self._compile('read: arg0 == 0 && arg1 == 0 || arg0 == 1') 222 223 self.assertEqual( 224 block.simulate(self.arch.arch_nr, self.arch.syscalls['read'], 0, 225 0)[1], 'ALLOW') 226 self.assertEqual( 227 block.simulate(self.arch.arch_nr, self.arch.syscalls['read'], 0, 228 1)[1], 'KILL_PROCESS') 229 self.assertEqual( 230 block.simulate(self.arch.arch_nr, self.arch.syscalls['read'], 1, 231 0)[1], 'ALLOW') 232 self.assertEqual( 233 block.simulate(self.arch.arch_nr, self.arch.syscalls['read'], 1, 234 1)[1], 'ALLOW') 235 236 def test_trap(self): 237 """Accept lines that trap unconditionally.""" 238 block = self._compile('read: trap') 239 240 self.assertEqual( 241 block.simulate(self.arch.arch_nr, self.arch.syscalls['read'], 242 0)[1], 'TRAP') 243 244 def test_ret_errno(self): 245 """Accept lines that return errno.""" 246 block = self._compile('read : arg0 == 0 || arg0 == 1 ; return 1') 247 248 self.assertEqual( 249 block.simulate(self.arch.arch_nr, self.arch.syscalls['read'], 250 0)[1:], ('ERRNO', 1)) 251 self.assertEqual( 252 block.simulate(self.arch.arch_nr, self.arch.syscalls['read'], 253 1)[1:], ('ERRNO', 1)) 254 self.assertEqual( 255 block.simulate(self.arch.arch_nr, self.arch.syscalls['read'], 256 2)[1], 'KILL_PROCESS') 257 258 def test_ret_errno_unconditionally(self): 259 """Accept lines that return errno unconditionally.""" 260 block = self._compile('read: return 1') 261 262 self.assertEqual( 263 block.simulate(self.arch.arch_nr, self.arch.syscalls['read'], 264 0)[1:], ('ERRNO', 1)) 265 266 def test_trace(self): 267 """Accept lines that trace unconditionally.""" 268 block = self._compile('read: trace') 269 270 self.assertEqual( 271 block.simulate(self.arch.arch_nr, self.arch.syscalls['read'], 272 0)[1], 'TRACE') 273 274 def test_log(self): 275 """Accept lines that log unconditionally.""" 276 block = self._compile('read: log') 277 278 self.assertEqual( 279 block.simulate(self.arch.arch_nr, self.arch.syscalls['read'], 280 0)[1], 'LOG') 281 282 def test_mmap_write_xor_exec(self): 283 """Accept the idiomatic filter for mmap.""" 284 block = self._compile( 285 'read : arg0 in ~PROT_WRITE || arg0 in ~PROT_EXEC') 286 287 prot_exec_and_write = 6 288 for prot in range(0, 0xf): 289 if (prot & prot_exec_and_write) == prot_exec_and_write: 290 self.assertEqual( 291 block.simulate(self.arch.arch_nr, 292 self.arch.syscalls['read'], prot)[1], 293 'KILL_PROCESS') 294 else: 295 self.assertEqual( 296 block.simulate(self.arch.arch_nr, 297 self.arch.syscalls['read'], prot)[1], 298 'ALLOW') 299 300 301 class CompileFileTests(unittest.TestCase): 302 """Tests for PolicyCompiler.compile_file.""" 303 304 def setUp(self): 305 self.arch = ARCH_64 306 self.compiler = compiler.PolicyCompiler(self.arch) 307 self.tempdir = tempfile.mkdtemp() 308 309 def tearDown(self): 310 shutil.rmtree(self.tempdir) 311 312 def _write_file(self, filename, contents): 313 """Helper to write out a file for testing.""" 314 path = os.path.join(self.tempdir, filename) 315 with open(path, 'w') as outf: 316 outf.write(contents) 317 return path 318 319 def test_compile(self): 320 """Ensure compilation works with all strategies.""" 321 self._write_file( 322 'test.frequency', """ 323 read: 1 324 close: 10 325 """) 326 path = self._write_file( 327 'test.policy', """ 328 @frequency ./test.frequency 329 read: 1 330 close: 1 331 """) 332 333 program = self.compiler.compile_file( 334 path, 335 optimization_strategy=compiler.OptimizationStrategy.LINEAR, 336 kill_action=bpf.KillProcess()) 337 self.assertGreater( 338 bpf.simulate(program.instructions, self.arch.arch_nr, 339 self.arch.syscalls['read'], 0)[0], 340 bpf.simulate(program.instructions, self.arch.arch_nr, 341 self.arch.syscalls['close'], 0)[0], 342 ) 343 344 def test_compile_bst(self): 345 """Ensure compilation with BST is cheaper than the linear model.""" 346 self._write_file( 347 'test.frequency', """ 348 read: 1 349 close: 10 350 """) 351 path = self._write_file( 352 'test.policy', """ 353 @frequency ./test.frequency 354 read: 1 355 close: 1 356 """) 357 358 for strategy in list(compiler.OptimizationStrategy): 359 program = self.compiler.compile_file( 360 path, 361 optimization_strategy=strategy, 362 kill_action=bpf.KillProcess()) 363 self.assertGreater( 364 bpf.simulate(program.instructions, self.arch.arch_nr, 365 self.arch.syscalls['read'], 0)[0], 366 bpf.simulate(program.instructions, self.arch.arch_nr, 367 self.arch.syscalls['close'], 0)[0], 368 ) 369 self.assertEqual( 370 bpf.simulate(program.instructions, self.arch.arch_nr, 371 self.arch.syscalls['read'], 0)[1], 'ALLOW') 372 self.assertEqual( 373 bpf.simulate(program.instructions, self.arch.arch_nr, 374 self.arch.syscalls['close'], 0)[1], 'ALLOW') 375 376 def test_compile_empty_file(self): 377 """Accept empty files.""" 378 path = self._write_file( 379 'test.policy', """ 380 @default kill-thread 381 """) 382 383 for strategy in list(compiler.OptimizationStrategy): 384 program = self.compiler.compile_file( 385 path, 386 optimization_strategy=strategy, 387 kill_action=bpf.KillProcess()) 388 self.assertEqual( 389 bpf.simulate(program.instructions, self.arch.arch_nr, 390 self.arch.syscalls['read'], 0)[1], 'KILL_THREAD') 391 392 def test_compile_simulate(self): 393 """Ensure policy reflects script by testing some random scripts.""" 394 iterations = 5 395 for i in range(iterations): 396 num_entries = 64 * (i + 1) // iterations 397 syscalls = dict( 398 zip( 399 random.sample(self.arch.syscalls.keys(), num_entries), 400 (random.randint(1, 1024) for _ in range(num_entries)), 401 )) 402 403 frequency_contents = '\n'.join( 404 '%s: %d' % s for s in syscalls.items()) 405 policy_contents = '@frequency ./test.frequency\n' + '\n'.join( 406 '%s: 1' % s[0] for s in syscalls.items()) 407 408 self._write_file('test.frequency', frequency_contents) 409 path = self._write_file('test.policy', policy_contents) 410 411 for strategy in list(compiler.OptimizationStrategy): 412 program = self.compiler.compile_file( 413 path, 414 optimization_strategy=strategy, 415 kill_action=bpf.KillProcess()) 416 for name, number in self.arch.syscalls.items(): 417 expected_result = ('ALLOW' 418 if name in syscalls else 'KILL_PROCESS') 419 self.assertEqual( 420 bpf.simulate(program.instructions, self.arch.arch_nr, 421 number, 0)[1], expected_result, 422 ('syscall name: %s, syscall number: %d, ' 423 'strategy: %s, policy:\n%s') % 424 (name, number, strategy, policy_contents)) 425 426 @unittest.skipIf(not int(os.getenv('SLOW_TESTS', '0')), 'slow') 427 def test_compile_huge_policy(self): 428 """Ensure jumps while compiling a huge policy are still valid.""" 429 # Given that the BST strategy is O(n^3), don't choose a crazy large 430 # value, but it still needs to be around 128 so that we exercise the 431 # codegen paths that depend on the length of the jump. 432 # 433 # Immediate jump offsets in BPF comparison instructions are limited to 434 # 256 instructions, so given that every syscall filter consists of a 435 # load and jump instructions, with 128 syscalls there will be at least 436 # one jump that's further than 256 instructions. 437 num_entries = 128 438 syscalls = dict(random.sample(self.arch.syscalls.items(), num_entries)) 439 # Here we force every single filter to be distinct. Otherwise the 440 # codegen layer will coalesce filters that compile to the same 441 # instructions. 442 policy_contents = '\n'.join( 443 '%s: arg0 == %d' % s for s in syscalls.items()) 444 445 path = self._write_file('test.policy', policy_contents) 446 447 program = self.compiler.compile_file( 448 path, 449 optimization_strategy=compiler.OptimizationStrategy.BST, 450 kill_action=bpf.KillProcess()) 451 for name, number in self.arch.syscalls.items(): 452 expected_result = ('ALLOW' 453 if name in syscalls else 'KILL_PROCESS') 454 self.assertEqual( 455 bpf.simulate(program.instructions, self.arch.arch_nr, 456 self.arch.syscalls[name], number)[1], 457 expected_result) 458 self.assertEqual( 459 bpf.simulate(program.instructions, self.arch.arch_nr, 460 self.arch.syscalls[name], number + 1)[1], 461 'KILL_PROCESS') 462 463 def test_compile_huge_filter(self): 464 """Ensure jumps while compiling a huge policy are still valid.""" 465 # This is intended to force cases where the AST visitation would result 466 # in a combinatorial explosion of calls to Block.accept(). An optimized 467 # implementation should be O(n). 468 num_entries = 128 469 syscalls = {} 470 # Here we force every single filter to be distinct. Otherwise the 471 # codegen layer will coalesce filters that compile to the same 472 # instructions. 473 policy_contents = [] 474 for name in random.sample(self.arch.syscalls.keys(), num_entries): 475 values = random.sample(range(1024), num_entries) 476 syscalls[name] = values 477 policy_contents.append( 478 '%s: %s' % (name, ' || '.join('arg0 == %d' % value 479 for value in values))) 480 481 path = self._write_file('test.policy', '\n'.join(policy_contents)) 482 483 program = self.compiler.compile_file( 484 path, 485 optimization_strategy=compiler.OptimizationStrategy.LINEAR, 486 kill_action=bpf.KillProcess()) 487 for name, values in syscalls.items(): 488 self.assertEqual( 489 bpf.simulate(program.instructions, 490 self.arch.arch_nr, self.arch.syscalls[name], 491 random.choice(values))[1], 'ALLOW') 492 self.assertEqual( 493 bpf.simulate(program.instructions, self.arch.arch_nr, 494 self.arch.syscalls[name], 1025)[1], 495 'KILL_PROCESS') 496 497 498 if __name__ == '__main__': 499 unittest.main() 500