Home | History | Annotate | Download | only in security_Firewall
      1 # Copyright (c) 2012 The Chromium OS Authors. All rights reserved.
      2 # Use of this source code is governed by a BSD-style license that can be
      3 # found in the LICENSE file.
      4 
      5 import logging
      6 import os
      7 
      8 from autotest_lib.client.bin import test
      9 from autotest_lib.client.common_lib import error
     10 from autotest_lib.client.common_lib import utils
     11 from autotest_lib.client.common_lib.cros.tendo import webservd_helper
     12 
     13 
     14 class security_Firewall(test.test):
     15     """Tests that rules in iptables/ip6tables match our expectations exactly."""
     16     version = 1
     17 
     18 
     19     @staticmethod
     20     def get_firewall_settings(executable):
     21         rules = utils.system_output("%s -S" % executable)
     22         return set([line.strip() for line in rules.splitlines()])
     23 
     24 
     25     def load_baseline(self, baseline_filename):
     26         """The baseline file lists the rules that we expect.
     27 
     28         @param baseline_filename: string name of file containing relevant rules.
     29         """
     30         baseline_path = os.path.join(self.bindir, baseline_filename)
     31         with open(baseline_path) as f:
     32             return set([line.strip() for line in f.readlines()])
     33 
     34 
     35     def dump_rules(self, rules, executable):
     36         """Store actual rules in results/ for future use.
     37 
     38         Leaves a list of iptables/ip6tables rules in the results dir
     39         so that we can update the baseline file if necessary.
     40 
     41         @param rules: list of string containing rules we found on the board.
     42         @param executable: 'iptables' for IPv4 or 'ip6tables' for IPv6.
     43         """
     44         outf = open(os.path.join(self.resultsdir, "%s_rules" % executable), 'w')
     45         for rule in rules:
     46             outf.write(rule + "\n")
     47 
     48         outf.close()
     49 
     50 
     51     @staticmethod
     52     def log_error_rules(rules, message):
     53         """Log a set of rules and the problem with those rules.
     54 
     55         @param rules: list of string containing rules we have issues with.
     56         @param message: string detailing what our problem with the rules is.
     57         """
     58         rules_str = ", ".join(["'%s'" % rule for rule in rules])
     59         logging.error("%s: %s", message, rules_str)
     60 
     61 
     62     def run_once(self):
     63         """Matches found and expected iptables/ip6tables rules.
     64         Fails only when rules are missing.
     65         """
     66 
     67         failed = False
     68         for executable in ["iptables", "ip6tables"]:
     69             baseline = self.load_baseline("baseline.%s" % executable)
     70             # TODO(wiley) Remove when we get per-board baselines (crbug.com/406013)
     71             webserv_rules = self.load_baseline("baseline.webservd")
     72             if webservd_helper.webservd_is_running():
     73                 baseline.update(webserv_rules)
     74             current = self.get_firewall_settings(executable)
     75 
     76             # Save to results dir
     77             self.dump_rules(current, executable)
     78 
     79             missing_rules = baseline - current
     80             extra_rules = current - baseline
     81 
     82             if len(missing_rules) > 0:
     83                 failed = True
     84                 self.log_error_rules(missing_rules,
     85                                      "Missing %s rules" % executable)
     86 
     87             if len(extra_rules) > 0:
     88                 # TODO(zqiu): implement a way to verify per-interface rules
     89                 # that are created dynamically.
     90                 self.log_error_rules(extra_rules, "Extra %s rules" % executable)
     91 
     92         if failed:
     93             raise error.TestFail("Mismatched firewall rules")
     94