Home | History | Annotate | Download | only in platform_Firewall
      1 # Copyright 2015 The Chromium OS Authors. All rights reserved.
      2 # Use of this source code is governed by a BSD-style license that can be
      3 # found in the LICENSE file.
      4 
      5 import dbus
      6 import logging
      7 import os
      8 import time
      9 
     10 from autotest_lib.client.bin import test, utils
     11 from autotest_lib.client.common_lib import error
     12 
     13 
     14 class platform_Firewall(test.test):
     15     """Ensure the firewall service is working correctly."""
     16 
     17     version = 1
     18 
     19     _PORT = 1234
     20     _IFACE = "eth0"
     21 
     22     _TCP_RULE = "-A INPUT -p tcp -m tcp --dport %d -j ACCEPT" % _PORT
     23     _UDP_RULE = "-A INPUT -p udp -m udp --dport %d -j ACCEPT" % _PORT
     24     _IFACE_RULE = "-A INPUT -i %s -p tcp -m tcp --dport %d -j ACCEPT" % (_IFACE,
     25                                                                          _PORT)
     26 
     27     _POLL_INTERVAL = 5
     28 
     29     _IPTABLES_DEL_CMD = "%s -D INPUT -p %s -m %s --dport %d -j ACCEPT"
     30 
     31     @staticmethod
     32     def _iptables_rules(executable):
     33         rule_output = utils.system_output("%s -S" % executable)
     34         logging.debug(rule_output)
     35         return [line.strip() for line in rule_output.splitlines()]
     36 
     37 
     38     @staticmethod
     39     def _check(expected_rule, actual_rules, error_msg, executable, check):
     40         # If check() returns false, fail the test.
     41         if not check(expected_rule, actual_rules):
     42             raise error.TestFail(error_msg % executable)
     43 
     44 
     45     @staticmethod
     46     def _check_included(expected_rule, actual_rules, error_msg, executable):
     47         # Test whether the rule is included, fail if it's not.
     48         platform_Firewall._check(
     49                 expected_rule, actual_rules, error_msg, executable,
     50                 lambda e, a: e in a)
     51 
     52 
     53     @staticmethod
     54     def _check_not_included(expected_rule, actual_rules, error_msg, executable):
     55         # Test whether the rule is not included, fail if it is.
     56         platform_Firewall._check(
     57                 expected_rule, actual_rules, error_msg, executable,
     58                 lambda e, a: e not in a)
     59 
     60 
     61     def run_once(self):
     62         # Create lifeline file descriptors.
     63         self.tcp_r, self.tcp_w = os.pipe()
     64         self.udp_r, self.udp_w = os.pipe()
     65         self.iface_r, self.iface_w = os.pipe()
     66 
     67         try:
     68             bus = dbus.SystemBus()
     69             pb_proxy = bus.get_object('org.chromium.PermissionBroker',
     70                                       '/org/chromium/PermissionBroker')
     71             pb = dbus.Interface(pb_proxy, 'org.chromium.PermissionBroker')
     72 
     73             tcp_lifeline = dbus.types.UnixFd(self.tcp_r)
     74             ret = pb.RequestTcpPortAccess(dbus.UInt16(self._PORT), "",
     75                                           tcp_lifeline)
     76             # |ret| is a dbus.Boolean, but compares as int.
     77             if ret == 0:
     78                 raise error.TestFail("RequestTcpPortAccess returned false.")
     79 
     80             udp_lifeline = dbus.types.UnixFd(self.udp_r)
     81             ret = pb.RequestUdpPortAccess(dbus.UInt16(self._PORT), "",
     82                                           udp_lifeline)
     83             # |ret| is a dbus.Boolean, but compares as int.
     84             if ret == 0:
     85                 raise error.TestFail("RequestUdpPortAccess returned false.")
     86 
     87             iface_lifeline = dbus.types.UnixFd(self.iface_r)
     88             ret = pb.RequestTcpPortAccess(dbus.UInt16(self._PORT),
     89                                           dbus.String(self._IFACE),
     90                                           iface_lifeline)
     91             # |ret| is a dbus.Boolean, but compares as int.
     92             if ret == 0:
     93                 raise error.TestFail(
     94                         "RequestTcpPortAccess(port, interface) returned false.")
     95 
     96             # Test IPv4 and IPv6.
     97             for executable in ["iptables", "ip6tables"]:
     98                 actual_rules = self._iptables_rules(executable)
     99                 self._check_included(
    100                         self._TCP_RULE, actual_rules,
    101                         "RequestTcpPortAccess did not add %s rule.",
    102                         executable)
    103                 self._check_included(
    104                         self._UDP_RULE, actual_rules,
    105                         "RequestUdpPortAccess did not add %s rule.",
    106                         executable)
    107                 self._check_included(
    108                         self._IFACE_RULE, actual_rules,
    109                         "RequestTcpPortAccess(port, interface)"
    110                         " did not add %s rule.",
    111                         executable)
    112 
    113             ret = pb.ReleaseTcpPort(dbus.UInt16(self._PORT), "")
    114             # |ret| is a dbus.Boolean, but compares as int.
    115             if ret == 0:
    116                 raise error.TestFail("ReleaseTcpPort returned false.")
    117 
    118             ret = pb.ReleaseUdpPort(dbus.UInt16(self._PORT), "")
    119             # |ret| is a dbus.Boolean, but compares as int.
    120             if ret == 0:
    121                 raise error.TestFail("ReleaseUdpPort returned false.")
    122 
    123             # Test IPv4 and IPv6.
    124             for executable in ["iptables", "ip6tables"]:
    125                 rules = self._iptables_rules(executable)
    126                 self._check_not_included(
    127                         self._TCP_RULE, rules,
    128                         "ReleaseTcpPortAccess did not remove %s rule.",
    129                         executable)
    130                 self._check_not_included(
    131                         self._UDP_RULE, rules,
    132                         "ReleaseUdpPortAccess did not remove %s rule.",
    133                         executable)
    134 
    135             # permission_broker should plug the firewall hole
    136             # when the requesting process exits.
    137             # Simulate the process exiting by closing |iface_w|.
    138             os.close(self.iface_w)
    139 
    140             # permission_broker checks every |_POLL_INTERVAL| seconds
    141             # for processes that have exited.
    142             # This is ugly, but it's either this or polling /var/log/messages.
    143             time.sleep(self._POLL_INTERVAL + 1)
    144             # Test IPv4 and IPv6.
    145             for executable in ["iptables", "ip6tables"]:
    146                 rules = self._iptables_rules(executable)
    147                 self._check_not_included(
    148                         self._IFACE_RULE, rules,
    149                         "permission_broker did not remove %s rule.",
    150                         executable)
    151 
    152         except dbus.DBusException as e:
    153             raise error.TestFail("D-Bus error: " + e.get_dbus_message())
    154 
    155 
    156     def cleanup(self):
    157         # File descriptors could already be closed.
    158         try:
    159             os.close(self.tcp_w)
    160             os.close(self.udp_w)
    161             os.close(self.iface_w)
    162         except OSError:
    163             pass
    164 
    165         # We don't want the cleanup() method to fail, so we ignore exit codes.
    166         # This also allows us to clean up iptables rules unconditionally.
    167         # The command will fail if the rule has already been deleted,
    168         # but it won't fail the test.
    169         for executable in ["iptables", "ip6tables"]:
    170             cmd = self._IPTABLES_DEL_CMD % (executable, "tcp", "tcp",
    171                                             self._PORT)
    172             utils.system(cmd, ignore_status=True)
    173             cmd = self._IPTABLES_DEL_CMD % (executable, "udp", "udp",
    174                                             self._PORT)
    175             utils.system(cmd, ignore_status=True)
    176             cmd = self._IPTABLES_DEL_CMD % (executable, "tcp", "tcp",
    177                                             self._PORT)
    178             cmd += " -i %s" % self._IFACE
    179             utils.system(cmd, ignore_status=True)
    180