Home | History | Annotate | Download | only in netprotos
      1 # Copyright (c) 2014 The Chromium OS Authors. All rights reserved.
      2 # Use of this source code is governed by a BSD-style license that can be
      3 # found in the LICENSE file.
      4 
      5 import unittest
      6 
      7 import dpkt
      8 import fake_host
      9 import socket
     10 import zeroconf
     11 
     12 
     13 FAKE_HOSTNAME = 'fakehost1'
     14 
     15 FAKE_IPADDR = '192.168.11.22'
     16 
     17 
     18 class TestZeroconfDaemon(unittest.TestCase):
     19     """Test class for ZeroconfDaemon."""
     20 
     21     def setUp(self):
     22         self._host = fake_host.FakeHost(FAKE_IPADDR)
     23         self._zero = zeroconf.ZeroconfDaemon(self._host, FAKE_HOSTNAME)
     24 
     25 
     26     def _query_A(self, name):
     27         """Returns the list of A records matching the given name.
     28 
     29         @param name: A domain name.
     30         @return a list of dpkt.dns.DNS.RR objects, one for each matching record.
     31         """
     32         q = dpkt.dns.DNS.Q(name=name, type=dpkt.dns.DNS_A)
     33         return self._zero._process_A(q)
     34 
     35 
     36     def testRegisterService(self):
     37         """Tests that we get appropriate records after registering a service."""
     38         SERVICE_PORT = 9
     39         SERVICE_TXT_LIST = ['lies=lies']
     40         self._zero.register_service('unique_prefix', '_service_type',
     41                                     '_tcp', SERVICE_PORT, SERVICE_TXT_LIST)
     42         name = '_service_type._tcp.local'
     43         fq_name = 'unique_prefix.' + name
     44         # Issue SRV, PTR, and TXT queries
     45         q_srv = dpkt.dns.DNS.Q(name=fq_name, type=dpkt.dns.DNS_SRV)
     46         q_txt = dpkt.dns.DNS.Q(name=fq_name, type=dpkt.dns.DNS_TXT)
     47         q_ptr = dpkt.dns.DNS.Q(name=name, type=dpkt.dns.DNS_PTR)
     48         ptr_responses = self._zero._process_PTR(q_ptr)
     49         srv_responses = self._zero._process_SRV(q_srv)
     50         txt_responses = self._zero._process_TXT(q_txt)
     51         self.assertTrue(ptr_responses)
     52         self.assertTrue(srv_responses)
     53         self.assertTrue(txt_responses)
     54         ptr_resp = ptr_responses[0]
     55         srv_resp = [resp for resp in srv_responses
     56                     if resp.type == dpkt.dns.DNS_SRV][0]
     57         txt_resp = txt_responses[0]
     58         # Check that basic things are right.
     59         self.assertEqual(fq_name, ptr_resp.ptrname)
     60         self.assertEqual(FAKE_HOSTNAME + '.' + self._zero.domain,
     61                          srv_resp.srvname)
     62         self.assertEqual(SERVICE_PORT, srv_resp.port)
     63         self.assertEqual(SERVICE_TXT_LIST, txt_resp.text)
     64 
     65 
     66     def testProperties(self):
     67         """Test the initial properties set by the constructor."""
     68         self.assertEqual(self._zero.host, self._host)
     69         self.assertEqual(self._zero.hostname, FAKE_HOSTNAME)
     70         self.assertEqual(self._zero.domain, 'local') # Default domain
     71         self.assertEqual(self._zero.full_hostname, FAKE_HOSTNAME + '.local')
     72 
     73 
     74     def testSocketInit(self):
     75         """Test that the constructor listens for mDNS traffic."""
     76 
     77         # Should create an UDP socket and bind it to the mDNS address and port.
     78         self.assertEqual(len(self._host._sockets), 1)
     79         sock = self._host._sockets[0]
     80 
     81         self.assertEqual(sock._family, socket.AF_INET) # IPv4
     82         self.assertEqual(sock._sock_type, socket.SOCK_DGRAM) # UDP
     83 
     84         # Check it is listening for UDP packets on the mDNS address and port.
     85         self.assertTrue(sock._bound)
     86         self.assertEqual(sock._bind_ip_addr, '224.0.0.251') # mDNS address
     87         self.assertEqual(sock._bind_port, 5353) # mDNS port
     88         self.assertTrue(callable(sock._bind_recv_callback))
     89 
     90 
     91     def testRecordsInit(self):
     92         """Test the A record of the host is registered."""
     93         host_A = self._query_A(self._zero.full_hostname)
     94         self.assertGreater(len(host_A), 0)
     95 
     96         record = host_A[0]
     97         # Check the hostname and the packed IP address.
     98         self.assertEqual(record.name, self._zero.full_hostname)
     99         self.assertEqual(record.ip, socket.inet_aton(self._host.ip_addr))
    100 
    101 
    102     def testDoubleTXTProcessing(self):
    103         """Test when more than one TXT record is present in a packet.
    104 
    105         A mDNS packet can include several answer records for several domains and
    106         record type. A corner case found on the field presents a mDNS packet
    107         with two TXT records for the same domain name on the same packet on its
    108         authoritative answers section while the packet itself is a query.
    109         """
    110         # Build the mDNS packet with two TXT records.
    111         domain_name = 'other_host.local'
    112         answers = [
    113                 dpkt.dns.DNS.RR(
    114                         type = dpkt.dns.DNS_TXT,
    115                         cls = dpkt.dns.DNS_IN,
    116                         ttl = 120,
    117                         name = domain_name,
    118                         text = ['one', 'two']),
    119                 dpkt.dns.DNS.RR(
    120                         type = dpkt.dns.DNS_TXT,
    121                         cls = dpkt.dns.DNS_IN,
    122                         ttl = 120,
    123                         name = domain_name,
    124                         text = ['two'])]
    125         # The packet is a query packet, with extra answers on the autoritative
    126         # section.
    127         mdns = dpkt.dns.DNS(
    128                 op = dpkt.dns.DNS_QUERY, # Standard query
    129                 rcode = dpkt.dns.DNS_RCODE_NOERR,
    130                 q = [],
    131                 an = [],
    132                 ns = answers)
    133 
    134         # Record the new answers received on the answer_calls list.
    135         answer_calls = []
    136         self._zero.add_answer_observer(lambda args: answer_calls.extend(args))
    137 
    138         # Send the packet to the registered callback.
    139         sock = self._host._sockets[0]
    140         cbk = sock._bind_recv_callback
    141         cbk(str(mdns), '1234', 5353)
    142 
    143         # Check that the answers callback is called with all the answers in the
    144         # received order.
    145         self.assertEqual(len(answer_calls), 2)
    146         ans1, ans2 = answer_calls # Each ans is a (rrtype, rrname, data)
    147         self.assertEqual(ans1[2], ('one', 'two'))
    148         self.assertEqual(ans2[2], ('two',))
    149 
    150         # Check that the two records were cached.
    151         records = self._zero.cached_results(domain_name, dpkt.dns.DNS_TXT)
    152         self.assertEqual(len(records), 2)
    153 
    154 
    155 if __name__ == '__main__':
    156     unittest.main()
    157