1 #!/usr/bin/python2.4 2 # Copyright (c) 2010 The Chromium Authors. All rights reserved. 3 # Use of this source code is governed by a BSD-style license that can be 4 # found in the LICENSE file. 5 6 """A bare-bones and non-compliant XMPP server. 7 8 Just enough of the protocol is implemented to get it to work with 9 Chrome's sync notification system. 10 """ 11 12 import asynchat 13 import asyncore 14 import base64 15 import re 16 import socket 17 from xml.dom import minidom 18 19 # pychecker complains about the use of fileno(), which is implemented 20 # by asyncore by forwarding to an internal object via __getattr__. 21 __pychecker__ = 'no-classattr' 22 23 24 class Error(Exception): 25 """Error class for this module.""" 26 pass 27 28 29 class UnexpectedXml(Error): 30 """Raised when an unexpected XML element has been encountered.""" 31 32 def __init__(self, xml_element): 33 xml_text = xml_element.toxml() 34 Error.__init__(self, 'Unexpected XML element', xml_text) 35 36 37 def ParseXml(xml_string): 38 """Parses the given string as XML and returns a minidom element 39 object. 40 """ 41 dom = minidom.parseString(xml_string) 42 43 # minidom handles xmlns specially, but there's a bug where it sets 44 # the attribute value to None, which causes toxml() or toprettyxml() 45 # to break. 46 def FixMinidomXmlnsBug(xml_element): 47 if xml_element.getAttribute('xmlns') is None: 48 xml_element.setAttribute('xmlns', '') 49 50 def ApplyToAllDescendantElements(xml_element, fn): 51 fn(xml_element) 52 for node in xml_element.childNodes: 53 if node.nodeType == node.ELEMENT_NODE: 54 ApplyToAllDescendantElements(node, fn) 55 56 root = dom.documentElement 57 ApplyToAllDescendantElements(root, FixMinidomXmlnsBug) 58 return root 59 60 61 def CloneXml(xml): 62 """Returns a deep copy of the given XML element. 63 64 Args: 65 xml: The XML element, which should be something returned from 66 ParseXml() (i.e., a root element). 67 """ 68 return xml.ownerDocument.cloneNode(True).documentElement 69 70 71 class StanzaParser(object): 72 """A hacky incremental XML parser. 73 74 StanzaParser consumes data incrementally via FeedString() and feeds 75 its delegate complete parsed stanzas (i.e., XML documents) via 76 FeedStanza(). Any stanzas passed to FeedStanza() are unlinked after 77 the callback is done. 78 79 Use like so: 80 81 class MyClass(object): 82 ... 83 def __init__(self, ...): 84 ... 85 self._parser = StanzaParser(self) 86 ... 87 88 def SomeFunction(self, ...): 89 ... 90 self._parser.FeedString(some_data) 91 ... 92 93 def FeedStanza(self, stanza): 94 ... 95 print stanza.toprettyxml() 96 ... 97 """ 98 99 # NOTE(akalin): The following regexps are naive, but necessary since 100 # none of the existing Python 2.4/2.5 XML libraries support 101 # incremental parsing. This works well enough for our purposes. 102 # 103 # The regexps below assume that any present XML element starts at 104 # the beginning of the string, but there may be trailing whitespace. 105 106 # Matches an opening stream tag (e.g., '<stream:stream foo="bar">') 107 # (assumes that the stream XML namespace is defined in the tag). 108 _stream_re = re.compile(r'^(<stream:stream [^>]*>)\s*') 109 110 # Matches an empty element tag (e.g., '<foo bar="baz"/>'). 111 _empty_element_re = re.compile(r'^(<[^>]*/>)\s*') 112 113 # Matches a non-empty element (e.g., '<foo bar="baz">quux</foo>'). 114 # Does *not* handle nested elements. 115 _non_empty_element_re = re.compile(r'^(<([^ >]*)[^>]*>.*?</\2>)\s*') 116 117 # The closing tag for a stream tag. We have to insert this 118 # ourselves since all XML stanzas are children of the stream tag, 119 # which is never closed until the connection is closed. 120 _stream_suffix = '</stream:stream>' 121 122 def __init__(self, delegate): 123 self._buffer = '' 124 self._delegate = delegate 125 126 def FeedString(self, data): 127 """Consumes the given string data, possibly feeding one or more 128 stanzas to the delegate. 129 """ 130 self._buffer += data 131 while (self._ProcessBuffer(self._stream_re, self._stream_suffix) or 132 self._ProcessBuffer(self._empty_element_re) or 133 self._ProcessBuffer(self._non_empty_element_re)): 134 pass 135 136 def _ProcessBuffer(self, regexp, xml_suffix=''): 137 """If the buffer matches the given regexp, removes the match from 138 the buffer, appends the given suffix, parses it, and feeds it to 139 the delegate. 140 141 Returns: 142 Whether or not the buffer matched the given regexp. 143 """ 144 results = regexp.match(self._buffer) 145 if not results: 146 return False 147 xml_text = self._buffer[:results.end()] + xml_suffix 148 self._buffer = self._buffer[results.end():] 149 stanza = ParseXml(xml_text) 150 self._delegate.FeedStanza(stanza) 151 # Needed because stanza may have cycles. 152 stanza.unlink() 153 return True 154 155 156 class Jid(object): 157 """Simple struct for an XMPP jid (essentially an e-mail address with 158 an optional resource string). 159 """ 160 161 def __init__(self, username, domain, resource=''): 162 self.username = username 163 self.domain = domain 164 self.resource = resource 165 166 def __str__(self): 167 jid_str = "%s@%s" % (self.username, self.domain) 168 if self.resource: 169 jid_str += '/' + self.resource 170 return jid_str 171 172 def GetBareJid(self): 173 return Jid(self.username, self.domain) 174 175 176 class IdGenerator(object): 177 """Simple class to generate unique IDs for XMPP messages.""" 178 179 def __init__(self, prefix): 180 self._prefix = prefix 181 self._id = 0 182 183 def GetNextId(self): 184 next_id = "%s.%s" % (self._prefix, self._id) 185 self._id += 1 186 return next_id 187 188 189 class HandshakeTask(object): 190 """Class to handle the initial handshake with a connected XMPP 191 client. 192 """ 193 194 # The handshake states in order. 195 (_INITIAL_STREAM_NEEDED, 196 _AUTH_NEEDED, 197 _AUTH_STREAM_NEEDED, 198 _BIND_NEEDED, 199 _SESSION_NEEDED, 200 _FINISHED) = range(6) 201 202 # Used when in the _INITIAL_STREAM_NEEDED and _AUTH_STREAM_NEEDED 203 # states. Not an XML object as it's only the opening tag. 204 # 205 # The from and id attributes are filled in later. 206 _STREAM_DATA = ( 207 '<stream:stream from="%s" id="%s" ' 208 'version="1.0" xmlns:stream="http://etherx.jabber.org/streams" ' 209 'xmlns="jabber:client">') 210 211 # Used when in the _INITIAL_STREAM_NEEDED state. 212 _AUTH_STANZA = ParseXml( 213 '<stream:features xmlns:stream="http://etherx.jabber.org/streams">' 214 ' <mechanisms xmlns="urn:ietf:params:xml:ns:xmpp-sasl">' 215 ' <mechanism>PLAIN</mechanism>' 216 ' <mechanism>X-GOOGLE-TOKEN</mechanism>' 217 ' </mechanisms>' 218 '</stream:features>') 219 220 # Used when in the _AUTH_NEEDED state. 221 _AUTH_SUCCESS_STANZA = ParseXml( 222 '<success xmlns="urn:ietf:params:xml:ns:xmpp-sasl"/>') 223 224 # Used when in the _AUTH_STREAM_NEEDED state. 225 _BIND_STANZA = ParseXml( 226 '<stream:features xmlns:stream="http://etherx.jabber.org/streams">' 227 ' <bind xmlns="urn:ietf:params:xml:ns:xmpp-bind"/>' 228 ' <session xmlns="urn:ietf:params:xml:ns:xmpp-session"/>' 229 '</stream:features>') 230 231 # Used when in the _BIND_NEEDED state. 232 # 233 # The id and jid attributes are filled in later. 234 _BIND_RESULT_STANZA = ParseXml( 235 '<iq id="" type="result">' 236 ' <bind xmlns="urn:ietf:params:xml:ns:xmpp-bind">' 237 ' <jid/>' 238 ' </bind>' 239 '</iq>') 240 241 # Used when in the _SESSION_NEEDED state. 242 # 243 # The id attribute is filled in later. 244 _IQ_RESPONSE_STANZA = ParseXml('<iq id="" type="result"/>') 245 246 def __init__(self, connection, resource_prefix): 247 self._connection = connection 248 self._id_generator = IdGenerator(resource_prefix) 249 self._username = '' 250 self._domain = '' 251 self._jid = None 252 self._resource_prefix = resource_prefix 253 self._state = self._INITIAL_STREAM_NEEDED 254 255 def FeedStanza(self, stanza): 256 """Inspects the given stanza and changes the handshake state if needed. 257 258 Called when a stanza is received from the client. Inspects the 259 stanza to make sure it has the expected attributes given the 260 current state, advances the state if needed, and sends a reply to 261 the client if needed. 262 """ 263 def ExpectStanza(stanza, name): 264 if stanza.tagName != name: 265 raise UnexpectedXml(stanza) 266 267 def ExpectIq(stanza, type, name): 268 ExpectStanza(stanza, 'iq') 269 if (stanza.getAttribute('type') != type or 270 stanza.firstChild.tagName != name): 271 raise UnexpectedXml(stanza) 272 273 def GetStanzaId(stanza): 274 return stanza.getAttribute('id') 275 276 def HandleStream(stanza): 277 ExpectStanza(stanza, 'stream:stream') 278 domain = stanza.getAttribute('to') 279 if domain: 280 self._domain = domain 281 SendStreamData() 282 283 def SendStreamData(): 284 next_id = self._id_generator.GetNextId() 285 stream_data = self._STREAM_DATA % (self._domain, next_id) 286 self._connection.SendData(stream_data) 287 288 def GetUserDomain(stanza): 289 encoded_username_password = stanza.firstChild.data 290 username_password = base64.b64decode(encoded_username_password) 291 (_, username_domain, _) = username_password.split('\0') 292 # The domain may be omitted. 293 # 294 # If we were using python 2.5, we'd be able to do: 295 # 296 # username, _, domain = username_domain.partition('@') 297 # if not domain: 298 # domain = self._domain 299 at_pos = username_domain.find('@') 300 if at_pos != -1: 301 username = username_domain[:at_pos] 302 domain = username_domain[at_pos+1:] 303 else: 304 username = username_domain 305 domain = self._domain 306 return (username, domain) 307 308 if self._state == self._INITIAL_STREAM_NEEDED: 309 HandleStream(stanza) 310 self._connection.SendStanza(self._AUTH_STANZA, False) 311 self._state = self._AUTH_NEEDED 312 313 elif self._state == self._AUTH_NEEDED: 314 ExpectStanza(stanza, 'auth') 315 (self._username, self._domain) = GetUserDomain(stanza) 316 self._connection.SendStanza(self._AUTH_SUCCESS_STANZA, False) 317 self._state = self._AUTH_STREAM_NEEDED 318 319 elif self._state == self._AUTH_STREAM_NEEDED: 320 HandleStream(stanza) 321 self._connection.SendStanza(self._BIND_STANZA, False) 322 self._state = self._BIND_NEEDED 323 324 elif self._state == self._BIND_NEEDED: 325 ExpectIq(stanza, 'set', 'bind') 326 stanza_id = GetStanzaId(stanza) 327 resource_element = stanza.getElementsByTagName('resource')[0] 328 resource = resource_element.firstChild.data 329 full_resource = '%s.%s' % (self._resource_prefix, resource) 330 response = CloneXml(self._BIND_RESULT_STANZA) 331 response.setAttribute('id', stanza_id) 332 self._jid = Jid(self._username, self._domain, full_resource) 333 jid_text = response.parentNode.createTextNode(str(self._jid)) 334 response.getElementsByTagName('jid')[0].appendChild(jid_text) 335 self._connection.SendStanza(response) 336 self._state = self._SESSION_NEEDED 337 338 elif self._state == self._SESSION_NEEDED: 339 ExpectIq(stanza, 'set', 'session') 340 stanza_id = GetStanzaId(stanza) 341 xml = CloneXml(self._IQ_RESPONSE_STANZA) 342 xml.setAttribute('id', stanza_id) 343 self._connection.SendStanza(xml) 344 self._state = self._FINISHED 345 self._connection.HandshakeDone(self._jid) 346 347 348 def AddrString(addr): 349 return '%s:%d' % addr 350 351 352 class XmppConnection(asynchat.async_chat): 353 """A single XMPP client connection. 354 355 This class handles the connection to a single XMPP client (via a 356 socket). It does the XMPP handshake and also implements the (old) 357 Google notification protocol. 358 """ 359 360 # Used for acknowledgements to the client. 361 # 362 # The from and id attributes are filled in later. 363 _IQ_RESPONSE_STANZA = ParseXml('<iq from="" id="" type="result"/>') 364 365 def __init__(self, sock, socket_map, delegate, addr): 366 """Starts up the xmpp connection. 367 368 Args: 369 sock: The socket to the client. 370 socket_map: A map from sockets to their owning objects. 371 delegate: The delegate, which is notified when the XMPP 372 handshake is successful, when the connection is closed, and 373 when a notification has to be broadcast. 374 addr: The host/port of the client. 375 """ 376 # We do this because in versions of python < 2.6, 377 # async_chat.__init__ doesn't take a map argument nor pass it to 378 # dispatcher.__init__. We rely on the fact that 379 # async_chat.__init__ calls dispatcher.__init__ as the last thing 380 # it does, and that calling dispatcher.__init__ with socket=None 381 # and map=None is essentially a no-op. 382 asynchat.async_chat.__init__(self) 383 asyncore.dispatcher.__init__(self, sock, socket_map) 384 385 self.set_terminator(None) 386 387 self._delegate = delegate 388 self._parser = StanzaParser(self) 389 self._jid = None 390 391 self._addr = addr 392 addr_str = AddrString(self._addr) 393 self._handshake_task = HandshakeTask(self, addr_str) 394 print 'Starting connection to %s' % self 395 396 def __str__(self): 397 if self._jid: 398 return str(self._jid) 399 else: 400 return AddrString(self._addr) 401 402 # async_chat implementation. 403 404 def collect_incoming_data(self, data): 405 self._parser.FeedString(data) 406 407 # This is only here to make pychecker happy. 408 def found_terminator(self): 409 asynchat.async_chat.found_terminator(self) 410 411 def close(self): 412 print "Closing connection to %s" % self 413 self._delegate.OnXmppConnectionClosed(self) 414 asynchat.async_chat.close(self) 415 416 # Called by self._parser.FeedString(). 417 def FeedStanza(self, stanza): 418 if self._handshake_task: 419 self._handshake_task.FeedStanza(stanza) 420 elif stanza.tagName == 'iq' and stanza.getAttribute('type') == 'result': 421 # Ignore all client acks. 422 pass 423 elif (stanza.firstChild and 424 stanza.firstChild.namespaceURI == 'google:push'): 425 self._HandlePushCommand(stanza) 426 else: 427 raise UnexpectedXml(stanza) 428 429 # Called by self._handshake_task. 430 def HandshakeDone(self, jid): 431 self._jid = jid 432 self._handshake_task = None 433 self._delegate.OnXmppHandshakeDone(self) 434 print "Handshake done for %s" % self 435 436 def _HandlePushCommand(self, stanza): 437 if stanza.tagName == 'iq' and stanza.firstChild.tagName == 'subscribe': 438 # Subscription request. 439 self._SendIqResponseStanza(stanza) 440 elif stanza.tagName == 'message' and stanza.firstChild.tagName == 'push': 441 # Send notification request. 442 self._delegate.ForwardNotification(self, stanza) 443 else: 444 raise UnexpectedXml(command_xml) 445 446 def _SendIqResponseStanza(self, iq): 447 stanza = CloneXml(self._IQ_RESPONSE_STANZA) 448 stanza.setAttribute('from', str(self._jid.GetBareJid())) 449 stanza.setAttribute('id', iq.getAttribute('id')) 450 self.SendStanza(stanza) 451 452 def SendStanza(self, stanza, unlink=True): 453 """Sends a stanza to the client. 454 455 Args: 456 stanza: The stanza to send. 457 unlink: Whether to unlink stanza after sending it. (Pass in 458 False if stanza is a constant.) 459 """ 460 self.SendData(stanza.toxml()) 461 if unlink: 462 stanza.unlink() 463 464 def SendData(self, data): 465 """Sends raw data to the client. 466 """ 467 # We explicitly encode to ascii as that is what the client expects 468 # (some minidom library functions return unicode strings). 469 self.push(data.encode('ascii')) 470 471 def ForwardNotification(self, notification_stanza): 472 """Forwards a notification to the client.""" 473 notification_stanza.setAttribute('from', str(self._jid.GetBareJid())) 474 notification_stanza.setAttribute('to', str(self._jid)) 475 self.SendStanza(notification_stanza, False) 476 477 478 class XmppServer(asyncore.dispatcher): 479 """The main XMPP server class. 480 481 The XMPP server starts accepting connections on the given address 482 and spawns off XmppConnection objects for each one. 483 484 Use like so: 485 486 socket_map = {} 487 xmpp_server = xmppserver.XmppServer(socket_map, ('127.0.0.1', 5222)) 488 asyncore.loop(30.0, False, socket_map) 489 """ 490 491 def __init__(self, socket_map, addr): 492 asyncore.dispatcher.__init__(self, None, socket_map) 493 self.create_socket(socket.AF_INET, socket.SOCK_STREAM) 494 self.set_reuse_addr() 495 self.bind(addr) 496 self.listen(5) 497 self._socket_map = socket_map 498 self._connections = set() 499 self._handshake_done_connections = set() 500 501 def handle_accept(self): 502 (sock, addr) = self.accept() 503 xmpp_connection = XmppConnection(sock, self._socket_map, self, addr) 504 self._connections.add(xmpp_connection) 505 506 def close(self): 507 # A copy is necessary since calling close on each connection 508 # removes it from self._connections. 509 for connection in self._connections.copy(): 510 connection.close() 511 asyncore.dispatcher.close(self) 512 513 # XmppConnection delegate methods. 514 def OnXmppHandshakeDone(self, xmpp_connection): 515 self._handshake_done_connections.add(xmpp_connection) 516 517 def OnXmppConnectionClosed(self, xmpp_connection): 518 self._connections.discard(xmpp_connection) 519 self._handshake_done_connections.discard(xmpp_connection) 520 521 def ForwardNotification(self, unused_xmpp_connection, notification_stanza): 522 for connection in self._handshake_done_connections: 523 print 'Sending notification to %s' % connection 524 connection.ForwardNotification(notification_stanza) 525