Home | History | Annotate | Download | only in testserver
      1 #!/usr/bin/python2.4
      2 # Copyright (c) 2010 The Chromium Authors. All rights reserved.
      3 # Use of this source code is governed by a BSD-style license that can be
      4 # found in the LICENSE file.
      5 
      6 """A bare-bones and non-compliant XMPP server.
      7 
      8 Just enough of the protocol is implemented to get it to work with
      9 Chrome's sync notification system.
     10 """
     11 
     12 import asynchat
     13 import asyncore
     14 import base64
     15 import re
     16 import socket
     17 from xml.dom import minidom
     18 
     19 # pychecker complains about the use of fileno(), which is implemented
     20 # by asyncore by forwarding to an internal object via __getattr__.
     21 __pychecker__ = 'no-classattr'
     22 
     23 
     24 class Error(Exception):
     25   """Error class for this module."""
     26   pass
     27 
     28 
     29 class UnexpectedXml(Error):
     30   """Raised when an unexpected XML element has been encountered."""
     31 
     32   def __init__(self, xml_element):
     33     xml_text = xml_element.toxml()
     34     Error.__init__(self, 'Unexpected XML element', xml_text)
     35 
     36 
     37 def ParseXml(xml_string):
     38   """Parses the given string as XML and returns a minidom element
     39   object.
     40   """
     41   dom = minidom.parseString(xml_string)
     42 
     43   # minidom handles xmlns specially, but there's a bug where it sets
     44   # the attribute value to None, which causes toxml() or toprettyxml()
     45   # to break.
     46   def FixMinidomXmlnsBug(xml_element):
     47     if xml_element.getAttribute('xmlns') is None:
     48       xml_element.setAttribute('xmlns', '')
     49 
     50   def ApplyToAllDescendantElements(xml_element, fn):
     51     fn(xml_element)
     52     for node in xml_element.childNodes:
     53       if node.nodeType == node.ELEMENT_NODE:
     54         ApplyToAllDescendantElements(node, fn)
     55 
     56   root = dom.documentElement
     57   ApplyToAllDescendantElements(root, FixMinidomXmlnsBug)
     58   return root
     59 
     60 
     61 def CloneXml(xml):
     62   """Returns a deep copy of the given XML element.
     63 
     64   Args:
     65     xml: The XML element, which should be something returned from
     66          ParseXml() (i.e., a root element).
     67   """
     68   return xml.ownerDocument.cloneNode(True).documentElement
     69 
     70 
     71 class StanzaParser(object):
     72   """A hacky incremental XML parser.
     73 
     74   StanzaParser consumes data incrementally via FeedString() and feeds
     75   its delegate complete parsed stanzas (i.e., XML documents) via
     76   FeedStanza().  Any stanzas passed to FeedStanza() are unlinked after
     77   the callback is done.
     78 
     79   Use like so:
     80 
     81   class MyClass(object):
     82     ...
     83     def __init__(self, ...):
     84       ...
     85       self._parser = StanzaParser(self)
     86       ...
     87 
     88     def SomeFunction(self, ...):
     89       ...
     90       self._parser.FeedString(some_data)
     91       ...
     92 
     93     def FeedStanza(self, stanza):
     94       ...
     95       print stanza.toprettyxml()
     96       ...
     97   """
     98 
     99   # NOTE(akalin): The following regexps are naive, but necessary since
    100   # none of the existing Python 2.4/2.5 XML libraries support
    101   # incremental parsing.  This works well enough for our purposes.
    102   #
    103   # The regexps below assume that any present XML element starts at
    104   # the beginning of the string, but there may be trailing whitespace.
    105 
    106   # Matches an opening stream tag (e.g., '<stream:stream foo="bar">')
    107   # (assumes that the stream XML namespace is defined in the tag).
    108   _stream_re = re.compile(r'^(<stream:stream [^>]*>)\s*')
    109 
    110   # Matches an empty element tag (e.g., '<foo bar="baz"/>').
    111   _empty_element_re = re.compile(r'^(<[^>]*/>)\s*')
    112 
    113   # Matches a non-empty element (e.g., '<foo bar="baz">quux</foo>').
    114   # Does *not* handle nested elements.
    115   _non_empty_element_re = re.compile(r'^(<([^ >]*)[^>]*>.*?</\2>)\s*')
    116 
    117   # The closing tag for a stream tag.  We have to insert this
    118   # ourselves since all XML stanzas are children of the stream tag,
    119   # which is never closed until the connection is closed.
    120   _stream_suffix = '</stream:stream>'
    121 
    122   def __init__(self, delegate):
    123     self._buffer = ''
    124     self._delegate = delegate
    125 
    126   def FeedString(self, data):
    127     """Consumes the given string data, possibly feeding one or more
    128     stanzas to the delegate.
    129     """
    130     self._buffer += data
    131     while (self._ProcessBuffer(self._stream_re, self._stream_suffix) or
    132            self._ProcessBuffer(self._empty_element_re) or
    133            self._ProcessBuffer(self._non_empty_element_re)):
    134       pass
    135 
    136   def _ProcessBuffer(self, regexp, xml_suffix=''):
    137     """If the buffer matches the given regexp, removes the match from
    138     the buffer, appends the given suffix, parses it, and feeds it to
    139     the delegate.
    140 
    141     Returns:
    142       Whether or not the buffer matched the given regexp.
    143     """
    144     results = regexp.match(self._buffer)
    145     if not results:
    146       return False
    147     xml_text = self._buffer[:results.end()] + xml_suffix
    148     self._buffer = self._buffer[results.end():]
    149     stanza = ParseXml(xml_text)
    150     self._delegate.FeedStanza(stanza)
    151     # Needed because stanza may have cycles.
    152     stanza.unlink()
    153     return True
    154 
    155 
    156 class Jid(object):
    157   """Simple struct for an XMPP jid (essentially an e-mail address with
    158   an optional resource string).
    159   """
    160 
    161   def __init__(self, username, domain, resource=''):
    162     self.username = username
    163     self.domain = domain
    164     self.resource = resource
    165 
    166   def __str__(self):
    167     jid_str = "%s@%s" % (self.username, self.domain)
    168     if self.resource:
    169       jid_str += '/' + self.resource
    170     return jid_str
    171 
    172   def GetBareJid(self):
    173     return Jid(self.username, self.domain)
    174 
    175 
    176 class IdGenerator(object):
    177   """Simple class to generate unique IDs for XMPP messages."""
    178 
    179   def __init__(self, prefix):
    180     self._prefix = prefix
    181     self._id = 0
    182 
    183   def GetNextId(self):
    184     next_id = "%s.%s" % (self._prefix, self._id)
    185     self._id += 1
    186     return next_id
    187 
    188 
    189 class HandshakeTask(object):
    190   """Class to handle the initial handshake with a connected XMPP
    191   client.
    192   """
    193 
    194   # The handshake states in order.
    195   (_INITIAL_STREAM_NEEDED,
    196    _AUTH_NEEDED,
    197    _AUTH_STREAM_NEEDED,
    198    _BIND_NEEDED,
    199    _SESSION_NEEDED,
    200    _FINISHED) = range(6)
    201 
    202   # Used when in the _INITIAL_STREAM_NEEDED and _AUTH_STREAM_NEEDED
    203   # states.  Not an XML object as it's only the opening tag.
    204   #
    205   # The from and id attributes are filled in later.
    206   _STREAM_DATA = (
    207     '<stream:stream from="%s" id="%s" '
    208     'version="1.0" xmlns:stream="http://etherx.jabber.org/streams" '
    209     'xmlns="jabber:client">')
    210 
    211   # Used when in the _INITIAL_STREAM_NEEDED state.
    212   _AUTH_STANZA = ParseXml(
    213     '<stream:features xmlns:stream="http://etherx.jabber.org/streams">'
    214     '  <mechanisms xmlns="urn:ietf:params:xml:ns:xmpp-sasl">'
    215     '    <mechanism>PLAIN</mechanism>'
    216     '    <mechanism>X-GOOGLE-TOKEN</mechanism>'
    217     '  </mechanisms>'
    218     '</stream:features>')
    219 
    220   # Used when in the _AUTH_NEEDED state.
    221   _AUTH_SUCCESS_STANZA = ParseXml(
    222     '<success xmlns="urn:ietf:params:xml:ns:xmpp-sasl"/>')
    223 
    224   # Used when in the _AUTH_STREAM_NEEDED state.
    225   _BIND_STANZA = ParseXml(
    226     '<stream:features xmlns:stream="http://etherx.jabber.org/streams">'
    227     '  <bind xmlns="urn:ietf:params:xml:ns:xmpp-bind"/>'
    228     '  <session xmlns="urn:ietf:params:xml:ns:xmpp-session"/>'
    229     '</stream:features>')
    230 
    231   # Used when in the _BIND_NEEDED state.
    232   #
    233   # The id and jid attributes are filled in later.
    234   _BIND_RESULT_STANZA = ParseXml(
    235     '<iq id="" type="result">'
    236     '  <bind xmlns="urn:ietf:params:xml:ns:xmpp-bind">'
    237     '    <jid/>'
    238     '  </bind>'
    239     '</iq>')
    240 
    241   # Used when in the _SESSION_NEEDED state.
    242   #
    243   # The id attribute is filled in later.
    244   _IQ_RESPONSE_STANZA = ParseXml('<iq id="" type="result"/>')
    245 
    246   def __init__(self, connection, resource_prefix):
    247     self._connection = connection
    248     self._id_generator = IdGenerator(resource_prefix)
    249     self._username = ''
    250     self._domain = ''
    251     self._jid = None
    252     self._resource_prefix = resource_prefix
    253     self._state = self._INITIAL_STREAM_NEEDED
    254 
    255   def FeedStanza(self, stanza):
    256     """Inspects the given stanza and changes the handshake state if needed.
    257 
    258     Called when a stanza is received from the client.  Inspects the
    259     stanza to make sure it has the expected attributes given the
    260     current state, advances the state if needed, and sends a reply to
    261     the client if needed.
    262     """
    263     def ExpectStanza(stanza, name):
    264       if stanza.tagName != name:
    265         raise UnexpectedXml(stanza)
    266 
    267     def ExpectIq(stanza, type, name):
    268       ExpectStanza(stanza, 'iq')
    269       if (stanza.getAttribute('type') != type or
    270           stanza.firstChild.tagName != name):
    271         raise UnexpectedXml(stanza)
    272 
    273     def GetStanzaId(stanza):
    274       return stanza.getAttribute('id')
    275 
    276     def HandleStream(stanza):
    277       ExpectStanza(stanza, 'stream:stream')
    278       domain = stanza.getAttribute('to')
    279       if domain:
    280         self._domain = domain
    281       SendStreamData()
    282 
    283     def SendStreamData():
    284       next_id = self._id_generator.GetNextId()
    285       stream_data = self._STREAM_DATA % (self._domain, next_id)
    286       self._connection.SendData(stream_data)
    287 
    288     def GetUserDomain(stanza):
    289       encoded_username_password = stanza.firstChild.data
    290       username_password = base64.b64decode(encoded_username_password)
    291       (_, username_domain, _) = username_password.split('\0')
    292       # The domain may be omitted.
    293       #
    294       # If we were using python 2.5, we'd be able to do:
    295       #
    296       #   username, _, domain = username_domain.partition('@')
    297       #   if not domain:
    298       #     domain = self._domain
    299       at_pos = username_domain.find('@')
    300       if at_pos != -1:
    301         username = username_domain[:at_pos]
    302         domain = username_domain[at_pos+1:]
    303       else:
    304         username = username_domain
    305         domain = self._domain
    306       return (username, domain)
    307 
    308     if self._state == self._INITIAL_STREAM_NEEDED:
    309       HandleStream(stanza)
    310       self._connection.SendStanza(self._AUTH_STANZA, False)
    311       self._state = self._AUTH_NEEDED
    312 
    313     elif self._state == self._AUTH_NEEDED:
    314       ExpectStanza(stanza, 'auth')
    315       (self._username, self._domain) = GetUserDomain(stanza)
    316       self._connection.SendStanza(self._AUTH_SUCCESS_STANZA, False)
    317       self._state = self._AUTH_STREAM_NEEDED
    318 
    319     elif self._state == self._AUTH_STREAM_NEEDED:
    320       HandleStream(stanza)
    321       self._connection.SendStanza(self._BIND_STANZA, False)
    322       self._state = self._BIND_NEEDED
    323 
    324     elif self._state == self._BIND_NEEDED:
    325       ExpectIq(stanza, 'set', 'bind')
    326       stanza_id = GetStanzaId(stanza)
    327       resource_element = stanza.getElementsByTagName('resource')[0]
    328       resource = resource_element.firstChild.data
    329       full_resource = '%s.%s' % (self._resource_prefix, resource)
    330       response = CloneXml(self._BIND_RESULT_STANZA)
    331       response.setAttribute('id', stanza_id)
    332       self._jid = Jid(self._username, self._domain, full_resource)
    333       jid_text = response.parentNode.createTextNode(str(self._jid))
    334       response.getElementsByTagName('jid')[0].appendChild(jid_text)
    335       self._connection.SendStanza(response)
    336       self._state = self._SESSION_NEEDED
    337 
    338     elif self._state == self._SESSION_NEEDED:
    339       ExpectIq(stanza, 'set', 'session')
    340       stanza_id = GetStanzaId(stanza)
    341       xml = CloneXml(self._IQ_RESPONSE_STANZA)
    342       xml.setAttribute('id', stanza_id)
    343       self._connection.SendStanza(xml)
    344       self._state = self._FINISHED
    345       self._connection.HandshakeDone(self._jid)
    346 
    347 
    348 def AddrString(addr):
    349   return '%s:%d' % addr
    350 
    351 
    352 class XmppConnection(asynchat.async_chat):
    353   """A single XMPP client connection.
    354 
    355   This class handles the connection to a single XMPP client (via a
    356   socket).  It does the XMPP handshake and also implements the (old)
    357   Google notification protocol.
    358   """
    359 
    360   # Used for acknowledgements to the client.
    361   #
    362   # The from and id attributes are filled in later.
    363   _IQ_RESPONSE_STANZA = ParseXml('<iq from="" id="" type="result"/>')
    364 
    365   def __init__(self, sock, socket_map, delegate, addr):
    366     """Starts up the xmpp connection.
    367 
    368     Args:
    369       sock: The socket to the client.
    370       socket_map: A map from sockets to their owning objects.
    371       delegate: The delegate, which is notified when the XMPP
    372         handshake is successful, when the connection is closed, and
    373         when a notification has to be broadcast.
    374       addr: The host/port of the client.
    375     """
    376     # We do this because in versions of python < 2.6,
    377     # async_chat.__init__ doesn't take a map argument nor pass it to
    378     # dispatcher.__init__.  We rely on the fact that
    379     # async_chat.__init__ calls dispatcher.__init__ as the last thing
    380     # it does, and that calling dispatcher.__init__ with socket=None
    381     # and map=None is essentially a no-op.
    382     asynchat.async_chat.__init__(self)
    383     asyncore.dispatcher.__init__(self, sock, socket_map)
    384 
    385     self.set_terminator(None)
    386 
    387     self._delegate = delegate
    388     self._parser = StanzaParser(self)
    389     self._jid = None
    390 
    391     self._addr = addr
    392     addr_str = AddrString(self._addr)
    393     self._handshake_task = HandshakeTask(self, addr_str)
    394     print 'Starting connection to %s' % self
    395 
    396   def __str__(self):
    397     if self._jid:
    398       return str(self._jid)
    399     else:
    400       return AddrString(self._addr)
    401 
    402   # async_chat implementation.
    403 
    404   def collect_incoming_data(self, data):
    405     self._parser.FeedString(data)
    406 
    407   # This is only here to make pychecker happy.
    408   def found_terminator(self):
    409     asynchat.async_chat.found_terminator(self)
    410 
    411   def close(self):
    412     print "Closing connection to %s" % self
    413     self._delegate.OnXmppConnectionClosed(self)
    414     asynchat.async_chat.close(self)
    415 
    416   # Called by self._parser.FeedString().
    417   def FeedStanza(self, stanza):
    418     if self._handshake_task:
    419       self._handshake_task.FeedStanza(stanza)
    420     elif stanza.tagName == 'iq' and stanza.getAttribute('type') == 'result':
    421       # Ignore all client acks.
    422       pass
    423     elif (stanza.firstChild and
    424           stanza.firstChild.namespaceURI == 'google:push'):
    425       self._HandlePushCommand(stanza)
    426     else:
    427       raise UnexpectedXml(stanza)
    428 
    429   # Called by self._handshake_task.
    430   def HandshakeDone(self, jid):
    431     self._jid = jid
    432     self._handshake_task = None
    433     self._delegate.OnXmppHandshakeDone(self)
    434     print "Handshake done for %s" % self
    435 
    436   def _HandlePushCommand(self, stanza):
    437     if stanza.tagName == 'iq' and stanza.firstChild.tagName == 'subscribe':
    438       # Subscription request.
    439       self._SendIqResponseStanza(stanza)
    440     elif stanza.tagName == 'message' and stanza.firstChild.tagName == 'push':
    441       # Send notification request.
    442       self._delegate.ForwardNotification(self, stanza)
    443     else:
    444       raise UnexpectedXml(command_xml)
    445 
    446   def _SendIqResponseStanza(self, iq):
    447     stanza = CloneXml(self._IQ_RESPONSE_STANZA)
    448     stanza.setAttribute('from', str(self._jid.GetBareJid()))
    449     stanza.setAttribute('id', iq.getAttribute('id'))
    450     self.SendStanza(stanza)
    451 
    452   def SendStanza(self, stanza, unlink=True):
    453     """Sends a stanza to the client.
    454 
    455     Args:
    456       stanza: The stanza to send.
    457       unlink: Whether to unlink stanza after sending it. (Pass in
    458       False if stanza is a constant.)
    459     """
    460     self.SendData(stanza.toxml())
    461     if unlink:
    462       stanza.unlink()
    463 
    464   def SendData(self, data):
    465     """Sends raw data to the client.
    466     """
    467     # We explicitly encode to ascii as that is what the client expects
    468     # (some minidom library functions return unicode strings).
    469     self.push(data.encode('ascii'))
    470 
    471   def ForwardNotification(self, notification_stanza):
    472     """Forwards a notification to the client."""
    473     notification_stanza.setAttribute('from', str(self._jid.GetBareJid()))
    474     notification_stanza.setAttribute('to', str(self._jid))
    475     self.SendStanza(notification_stanza, False)
    476 
    477 
    478 class XmppServer(asyncore.dispatcher):
    479   """The main XMPP server class.
    480 
    481   The XMPP server starts accepting connections on the given address
    482   and spawns off XmppConnection objects for each one.
    483 
    484   Use like so:
    485 
    486     socket_map = {}
    487     xmpp_server = xmppserver.XmppServer(socket_map, ('127.0.0.1', 5222))
    488     asyncore.loop(30.0, False, socket_map)
    489   """
    490 
    491   def __init__(self, socket_map, addr):
    492     asyncore.dispatcher.__init__(self, None, socket_map)
    493     self.create_socket(socket.AF_INET, socket.SOCK_STREAM)
    494     self.set_reuse_addr()
    495     self.bind(addr)
    496     self.listen(5)
    497     self._socket_map = socket_map
    498     self._connections = set()
    499     self._handshake_done_connections = set()
    500 
    501   def handle_accept(self):
    502     (sock, addr) = self.accept()
    503     xmpp_connection = XmppConnection(sock, self._socket_map, self, addr)
    504     self._connections.add(xmpp_connection)
    505 
    506   def close(self):
    507     # A copy is necessary since calling close on each connection
    508     # removes it from self._connections.
    509     for connection in self._connections.copy():
    510       connection.close()
    511     asyncore.dispatcher.close(self)
    512 
    513   # XmppConnection delegate methods.
    514   def OnXmppHandshakeDone(self, xmpp_connection):
    515     self._handshake_done_connections.add(xmpp_connection)
    516 
    517   def OnXmppConnectionClosed(self, xmpp_connection):
    518     self._connections.discard(xmpp_connection)
    519     self._handshake_done_connections.discard(xmpp_connection)
    520 
    521   def ForwardNotification(self, unused_xmpp_connection, notification_stanza):
    522     for connection in self._handshake_done_connections:
    523       print 'Sending notification to %s' % connection
    524       connection.ForwardNotification(notification_stanza)
    525