Home | History | Annotate | Download | only in cros
      1 # Copyright 2018 The Chromium OS Authors. All rights reserved.
      2 # Use of this source code is governed by a BSD-style license that can be
      3 # found in the LICENSE file.
      4 
      5 """Return information about routing table entries
      6 
      7 Read and parse the system routing table. There are
      8 four classes defined here: NetworkRoutes, which contains
      9 information about all routes; IPv4Route, which describes
     10 a single IPv4 routing table entry; IPv6Route, which
     11 does the same for IPv6; and Route, which has common code
     12 for IPv4Route and IPv6Route.
     13 """
     14 
     15 ROUTES_V4_FILE = "/proc/net/route"
     16 ROUTES_V6_FILE = "/proc/net/ipv6_route"
     17 
     18 # The following constants are from <net/route.h>
     19 RTF_UP      = 0x0001
     20 RTF_GATEWAY = 0x0002
     21 RTF_HOST    = 0x0004
     22 # IPv6 constants from <net/route.h>
     23 RTF_DEFAULT = 0x10000
     24 
     25 import socket
     26 import struct
     27 
     28 class Route(object):
     29     def __init__(self, iface, dest, gway, flags, mask):
     30         self.interface = iface
     31         self.destination = dest
     32         self.gateway = gway
     33         self.flagbits = flags
     34         self.netmask = mask
     35 
     36     def __str__(self):
     37         flags = ""
     38         if self.flagbits & RTF_UP:
     39             flags += "U"
     40         if self.flagbits & RTF_GATEWAY:
     41             flags += "G"
     42         if self.flagbits & RTF_HOST:
     43             flags += "H"
     44         if self.flagbits & RTF_DEFAULT:
     45             flags += "D"
     46         return "<%s dest: %s gway: %s mask: %s flags: %s>" % (
     47                 self.interface,
     48                 self._intToIp(self.destination),
     49                 self._intToIp(self.gateway),
     50                 self._intToIp(self.netmask),
     51                 flags)
     52 
     53     def isUsable(self):
     54         return self.flagbits & RTF_UP
     55 
     56     def isHostRoute(self):
     57         return self.flagbits & RTF_HOST
     58 
     59     def isGatewayRoute(self):
     60         return self.flagbits & RTF_GATEWAY
     61 
     62     def isInterfaceRoute(self):
     63         return (self.flagbits & RTF_GATEWAY) == 0
     64 
     65     def matches(self, ip):
     66         try:
     67             return (self._ipToInt(ip) & self.netmask) == self.destination
     68         except socket.error:
     69             return False
     70 
     71 
     72 class IPv4Route(Route):
     73     def __init__(self, iface, dest, gway, flags, mask):
     74         super(IPv4Route, self).__init__(
     75             iface, int(dest, 16), int(gway, 16), int(flags, 16), int(mask, 16))
     76 
     77     def _intToIp(self, addr):
     78         return socket.inet_ntoa(struct.pack('@I', addr))
     79 
     80     def _ipToInt(self, ip):
     81         return struct.unpack('I', socket.inet_aton(ip))[0]
     82 
     83     def isDefaultRoute(self):
     84         return (self.flagbits & RTF_GATEWAY) and self.destination == 0
     85 
     86 def parseIPv4Routes(routelist):
     87     # The first line is headers that will allow us
     88     # to correctly interpret the values in the following
     89     # lines
     90     headers = routelist[0].split()
     91     col_map = {token: pos for (pos, token) in enumerate(headers)}
     92 
     93     routes = []
     94     for routeline in routelist[1:]:
     95         route = routeline.split()
     96         interface = route[col_map["Iface"]]
     97         destination = route[col_map["Destination"]]
     98         gateway = route[col_map["Gateway"]]
     99         flags = route[col_map["Flags"]]
    100         mask = route[col_map["Mask"]]
    101         routes.append(IPv4Route(interface, destination, gateway, flags, mask))
    102 
    103     return routes
    104 
    105 
    106 class IPv6Route(Route):
    107     def __init__(self, iface, dest, gway, flags, plen):
    108         super(IPv6Route, self).__init__(
    109             iface,
    110             long(dest, 16),
    111             long(gway, 16),
    112             long(flags, 16),
    113             # netmask = set first plen bits to 1, all following to 0
    114             (1 << 128) - (1 << (128 - int(plen, 16))))
    115 
    116     def _intToIp(self, addr):
    117         return socket.inet_ntop(socket.AF_INET6, ("%032x" % addr).decode("hex"))
    118 
    119     def _ipToInt(self, ip):
    120         return long(socket.inet_pton(socket.AF_INET6, ip).encode("hex"), 16)
    121 
    122     def isDefaultRoute(self):
    123         return self.flagbits & RTF_DEFAULT
    124 
    125 def parseIPv6Routes(routelist):
    126     # ipv6_route has no headers, so the routing table looks like the following:
    127     # Dest DestPrefix Src SrcPrefix Gateway Metric RefCnt UseCnt Flags Iface
    128     routes = []
    129     for routeline in routelist:
    130         route = routeline.split()
    131         interface = route[9]
    132         destination = route[0]
    133         gateway = route[4]
    134         flags = route[8]
    135         prefix = route[1]
    136         routes.append(IPv6Route(interface, destination, gateway, flags, prefix))
    137 
    138     return routes
    139 
    140 
    141 class NetworkRoutes(object):
    142     def __init__(self, routelist_v4=None, routelist_v6=None):
    143         if routelist_v4 is None:
    144             with open(ROUTES_V4_FILE) as routef_v4:
    145                 routelist_v4 = routef_v4.readlines()
    146 
    147         self.routes = parseIPv4Routes(routelist_v4)
    148 
    149         if routelist_v6 is None:
    150             with open(ROUTES_V6_FILE) as routef_v6:
    151                 routelist_v6 = routef_v6.readlines()
    152 
    153         self.routes += parseIPv6Routes(routelist_v6)
    154 
    155     def _filterUsableRoutes(self):
    156         return (rr for rr in self.routes if rr.isUsable())
    157 
    158     def hasDefaultRoute(self, interface):
    159         return any(rr for rr in self._filterUsableRoutes()
    160                    if (rr.interface == interface and rr.isDefaultRoute()))
    161 
    162     def getDefaultRoutes(self):
    163         return [rr for rr in self._filterUsableRoutes() if rr.isDefaultRoute()]
    164 
    165     def hasInterfaceRoute(self, interface):
    166         return any(rr for rr in self._filterUsableRoutes()
    167                    if (rr.interface == interface and rr.isInterfaceRoute()))
    168 
    169     def getRouteFor(self, ip):
    170         for rr in self._filterUsableRoutes():
    171             if rr.matches(ip):
    172                 return rr
    173         return None
    174