1 import io 2 import socket 3 import datetime 4 import textwrap 5 import unittest 6 import functools 7 import contextlib 8 import os.path 9 import threading 10 11 from test import support 12 from nntplib import NNTP, GroupInfo 13 import nntplib 14 from unittest.mock import patch 15 try: 16 import ssl 17 except ImportError: 18 ssl = None 19 20 21 TIMEOUT = 30 22 certfile = os.path.join(os.path.dirname(__file__), 'keycert3.pem') 23 24 # TODO: 25 # - test the `file` arg to more commands 26 # - test error conditions 27 # - test auth and `usenetrc` 28 29 30 class NetworkedNNTPTestsMixin: 31 32 def test_welcome(self): 33 welcome = self.server.getwelcome() 34 self.assertEqual(str, type(welcome)) 35 36 def test_help(self): 37 resp, lines = self.server.help() 38 self.assertTrue(resp.startswith("100 "), resp) 39 for line in lines: 40 self.assertEqual(str, type(line)) 41 42 def test_list(self): 43 resp, groups = self.server.list() 44 if len(groups) > 0: 45 self.assertEqual(GroupInfo, type(groups[0])) 46 self.assertEqual(str, type(groups[0].group)) 47 48 def test_list_active(self): 49 resp, groups = self.server.list(self.GROUP_PAT) 50 if len(groups) > 0: 51 self.assertEqual(GroupInfo, type(groups[0])) 52 self.assertEqual(str, type(groups[0].group)) 53 54 def test_unknown_command(self): 55 with self.assertRaises(nntplib.NNTPPermanentError) as cm: 56 self.server._shortcmd("XYZZY") 57 resp = cm.exception.response 58 self.assertTrue(resp.startswith("500 "), resp) 59 60 def test_newgroups(self): 61 # gmane gets a constant influx of new groups. In order not to stress 62 # the server too much, we choose a recent date in the past. 63 dt = datetime.date.today() - datetime.timedelta(days=7) 64 resp, groups = self.server.newgroups(dt) 65 if len(groups) > 0: 66 self.assertIsInstance(groups[0], GroupInfo) 67 self.assertIsInstance(groups[0].group, str) 68 69 def test_description(self): 70 def _check_desc(desc): 71 # Sanity checks 72 self.assertIsInstance(desc, str) 73 self.assertNotIn(self.GROUP_NAME, desc) 74 desc = self.server.description(self.GROUP_NAME) 75 _check_desc(desc) 76 # Another sanity check 77 self.assertIn("Python", desc) 78 # With a pattern 79 desc = self.server.description(self.GROUP_PAT) 80 _check_desc(desc) 81 # Shouldn't exist 82 desc = self.server.description("zk.brrtt.baz") 83 self.assertEqual(desc, '') 84 85 def test_descriptions(self): 86 resp, descs = self.server.descriptions(self.GROUP_PAT) 87 # 215 for LIST NEWSGROUPS, 282 for XGTITLE 88 self.assertTrue( 89 resp.startswith("215 ") or resp.startswith("282 "), resp) 90 self.assertIsInstance(descs, dict) 91 desc = descs[self.GROUP_NAME] 92 self.assertEqual(desc, self.server.description(self.GROUP_NAME)) 93 94 def test_group(self): 95 result = self.server.group(self.GROUP_NAME) 96 self.assertEqual(5, len(result)) 97 resp, count, first, last, group = result 98 self.assertEqual(group, self.GROUP_NAME) 99 self.assertIsInstance(count, int) 100 self.assertIsInstance(first, int) 101 self.assertIsInstance(last, int) 102 self.assertLessEqual(first, last) 103 self.assertTrue(resp.startswith("211 "), resp) 104 105 def test_date(self): 106 resp, date = self.server.date() 107 self.assertIsInstance(date, datetime.datetime) 108 # Sanity check 109 self.assertGreaterEqual(date.year, 1995) 110 self.assertLessEqual(date.year, 2030) 111 112 def _check_art_dict(self, art_dict): 113 # Some sanity checks for a field dictionary returned by OVER / XOVER 114 self.assertIsInstance(art_dict, dict) 115 # NNTP has 7 mandatory fields 116 self.assertGreaterEqual(art_dict.keys(), 117 {"subject", "from", "date", "message-id", 118 "references", ":bytes", ":lines"} 119 ) 120 for v in art_dict.values(): 121 self.assertIsInstance(v, (str, type(None))) 122 123 def test_xover(self): 124 resp, count, first, last, name = self.server.group(self.GROUP_NAME) 125 resp, lines = self.server.xover(last - 5, last) 126 if len(lines) == 0: 127 self.skipTest("no articles retrieved") 128 # The 'last' article is not necessarily part of the output (cancelled?) 129 art_num, art_dict = lines[0] 130 self.assertGreaterEqual(art_num, last - 5) 131 self.assertLessEqual(art_num, last) 132 self._check_art_dict(art_dict) 133 134 @unittest.skipIf(True, 'temporarily skipped until a permanent solution' 135 ' is found for issue #28971') 136 def test_over(self): 137 resp, count, first, last, name = self.server.group(self.GROUP_NAME) 138 start = last - 10 139 # The "start-" article range form 140 resp, lines = self.server.over((start, None)) 141 art_num, art_dict = lines[0] 142 self._check_art_dict(art_dict) 143 # The "start-end" article range form 144 resp, lines = self.server.over((start, last)) 145 art_num, art_dict = lines[-1] 146 # The 'last' article is not necessarily part of the output (cancelled?) 147 self.assertGreaterEqual(art_num, start) 148 self.assertLessEqual(art_num, last) 149 self._check_art_dict(art_dict) 150 # XXX The "message_id" form is unsupported by gmane 151 # 503 Overview by message-ID unsupported 152 153 def test_xhdr(self): 154 resp, count, first, last, name = self.server.group(self.GROUP_NAME) 155 resp, lines = self.server.xhdr('subject', last) 156 for line in lines: 157 self.assertEqual(str, type(line[1])) 158 159 def check_article_resp(self, resp, article, art_num=None): 160 self.assertIsInstance(article, nntplib.ArticleInfo) 161 if art_num is not None: 162 self.assertEqual(article.number, art_num) 163 for line in article.lines: 164 self.assertIsInstance(line, bytes) 165 # XXX this could exceptionally happen... 166 self.assertNotIn(article.lines[-1], (b".", b".\n", b".\r\n")) 167 168 @unittest.skipIf(True, "FIXME: see bpo-32128") 169 def test_article_head_body(self): 170 resp, count, first, last, name = self.server.group(self.GROUP_NAME) 171 # Try to find an available article 172 for art_num in (last, first, last - 1): 173 try: 174 resp, head = self.server.head(art_num) 175 except nntplib.NNTPTemporaryError as e: 176 if not e.response.startswith("423 "): 177 raise 178 # "423 No such article" => choose another one 179 continue 180 break 181 else: 182 self.skipTest("could not find a suitable article number") 183 self.assertTrue(resp.startswith("221 "), resp) 184 self.check_article_resp(resp, head, art_num) 185 resp, body = self.server.body(art_num) 186 self.assertTrue(resp.startswith("222 "), resp) 187 self.check_article_resp(resp, body, art_num) 188 resp, article = self.server.article(art_num) 189 self.assertTrue(resp.startswith("220 "), resp) 190 self.check_article_resp(resp, article, art_num) 191 # Tolerate running the tests from behind a NNTP virus checker 192 blacklist = lambda line: line.startswith(b'X-Antivirus') 193 filtered_head_lines = [line for line in head.lines 194 if not blacklist(line)] 195 filtered_lines = [line for line in article.lines 196 if not blacklist(line)] 197 self.assertEqual(filtered_lines, filtered_head_lines + [b''] + body.lines) 198 199 def test_capabilities(self): 200 # The server under test implements NNTP version 2 and has a 201 # couple of well-known capabilities. Just sanity check that we 202 # got them. 203 def _check_caps(caps): 204 caps_list = caps['LIST'] 205 self.assertIsInstance(caps_list, (list, tuple)) 206 self.assertIn('OVERVIEW.FMT', caps_list) 207 self.assertGreaterEqual(self.server.nntp_version, 2) 208 _check_caps(self.server.getcapabilities()) 209 # This re-emits the command 210 resp, caps = self.server.capabilities() 211 _check_caps(caps) 212 213 def test_zlogin(self): 214 # This test must be the penultimate because further commands will be 215 # refused. 216 baduser = "notarealuser" 217 badpw = "notarealpassword" 218 # Check that bogus credentials cause failure 219 self.assertRaises(nntplib.NNTPError, self.server.login, 220 user=baduser, password=badpw, usenetrc=False) 221 # FIXME: We should check that correct credentials succeed, but that 222 # would require valid details for some server somewhere to be in the 223 # test suite, I think. Gmane is anonymous, at least as used for the 224 # other tests. 225 226 def test_zzquit(self): 227 # This test must be called last, hence the name 228 cls = type(self) 229 try: 230 self.server.quit() 231 finally: 232 cls.server = None 233 234 @classmethod 235 def wrap_methods(cls): 236 # Wrap all methods in a transient_internet() exception catcher 237 # XXX put a generic version in test.support? 238 def wrap_meth(meth): 239 @functools.wraps(meth) 240 def wrapped(self): 241 with support.transient_internet(self.NNTP_HOST): 242 meth(self) 243 return wrapped 244 for name in dir(cls): 245 if not name.startswith('test_'): 246 continue 247 meth = getattr(cls, name) 248 if not callable(meth): 249 continue 250 # Need to use a closure so that meth remains bound to its current 251 # value 252 setattr(cls, name, wrap_meth(meth)) 253 254 def test_with_statement(self): 255 def is_connected(): 256 if not hasattr(server, 'file'): 257 return False 258 try: 259 server.help() 260 except (OSError, EOFError): 261 return False 262 return True 263 264 with self.NNTP_CLASS(self.NNTP_HOST, timeout=TIMEOUT, usenetrc=False) as server: 265 self.assertTrue(is_connected()) 266 self.assertTrue(server.help()) 267 self.assertFalse(is_connected()) 268 269 with self.NNTP_CLASS(self.NNTP_HOST, timeout=TIMEOUT, usenetrc=False) as server: 270 server.quit() 271 self.assertFalse(is_connected()) 272 273 274 NetworkedNNTPTestsMixin.wrap_methods() 275 276 277 EOF_ERRORS = (EOFError,) 278 if ssl is not None: 279 EOF_ERRORS += (ssl.SSLEOFError,) 280 281 282 class NetworkedNNTPTests(NetworkedNNTPTestsMixin, unittest.TestCase): 283 # This server supports STARTTLS (gmane doesn't) 284 NNTP_HOST = 'news.trigofacile.com' 285 GROUP_NAME = 'fr.comp.lang.python' 286 GROUP_PAT = 'fr.comp.lang.*' 287 288 NNTP_CLASS = NNTP 289 290 @classmethod 291 def setUpClass(cls): 292 support.requires("network") 293 with support.transient_internet(cls.NNTP_HOST): 294 try: 295 cls.server = cls.NNTP_CLASS(cls.NNTP_HOST, timeout=TIMEOUT, 296 usenetrc=False) 297 except EOF_ERRORS: 298 raise unittest.SkipTest(f"{cls} got EOF error on connecting " 299 f"to {cls.NNTP_HOST!r}") 300 301 @classmethod 302 def tearDownClass(cls): 303 if cls.server is not None: 304 cls.server.quit() 305 306 @unittest.skipUnless(ssl, 'requires SSL support') 307 class NetworkedNNTP_SSLTests(NetworkedNNTPTests): 308 309 # Technical limits for this public NNTP server (see http://www.aioe.org): 310 # "Only two concurrent connections per IP address are allowed and 311 # 400 connections per day are accepted from each IP address." 312 313 NNTP_HOST = 'nntp.aioe.org' 314 GROUP_NAME = 'comp.lang.python' 315 GROUP_PAT = 'comp.lang.*' 316 317 NNTP_CLASS = getattr(nntplib, 'NNTP_SSL', None) 318 319 # Disabled as it produces too much data 320 test_list = None 321 322 # Disabled as the connection will already be encrypted. 323 test_starttls = None 324 325 326 # 327 # Non-networked tests using a local server (or something mocking it). 328 # 329 330 class _NNTPServerIO(io.RawIOBase): 331 """A raw IO object allowing NNTP commands to be received and processed 332 by a handler. The handler can push responses which can then be read 333 from the IO object.""" 334 335 def __init__(self, handler): 336 io.RawIOBase.__init__(self) 337 # The channel from the client 338 self.c2s = io.BytesIO() 339 # The channel to the client 340 self.s2c = io.BytesIO() 341 self.handler = handler 342 self.handler.start(self.c2s.readline, self.push_data) 343 344 def readable(self): 345 return True 346 347 def writable(self): 348 return True 349 350 def push_data(self, data): 351 """Push (buffer) some data to send to the client.""" 352 pos = self.s2c.tell() 353 self.s2c.seek(0, 2) 354 self.s2c.write(data) 355 self.s2c.seek(pos) 356 357 def write(self, b): 358 """The client sends us some data""" 359 pos = self.c2s.tell() 360 self.c2s.write(b) 361 self.c2s.seek(pos) 362 self.handler.process_pending() 363 return len(b) 364 365 def readinto(self, buf): 366 """The client wants to read a response""" 367 self.handler.process_pending() 368 b = self.s2c.read(len(buf)) 369 n = len(b) 370 buf[:n] = b 371 return n 372 373 374 def make_mock_file(handler): 375 sio = _NNTPServerIO(handler) 376 # Using BufferedRWPair instead of BufferedRandom ensures the file 377 # isn't seekable. 378 file = io.BufferedRWPair(sio, sio) 379 return (sio, file) 380 381 382 class MockedNNTPTestsMixin: 383 # Override in derived classes 384 handler_class = None 385 386 def setUp(self): 387 super().setUp() 388 self.make_server() 389 390 def tearDown(self): 391 super().tearDown() 392 del self.server 393 394 def make_server(self, *args, **kwargs): 395 self.handler = self.handler_class() 396 self.sio, file = make_mock_file(self.handler) 397 self.server = nntplib._NNTPBase(file, 'test.server', *args, **kwargs) 398 return self.server 399 400 401 class MockedNNTPWithReaderModeMixin(MockedNNTPTestsMixin): 402 def setUp(self): 403 super().setUp() 404 self.make_server(readermode=True) 405 406 407 class NNTPv1Handler: 408 """A handler for RFC 977""" 409 410 welcome = "200 NNTP mock server" 411 412 def start(self, readline, push_data): 413 self.in_body = False 414 self.allow_posting = True 415 self._readline = readline 416 self._push_data = push_data 417 self._logged_in = False 418 self._user_sent = False 419 # Our welcome 420 self.handle_welcome() 421 422 def _decode(self, data): 423 return str(data, "utf-8", "surrogateescape") 424 425 def process_pending(self): 426 if self.in_body: 427 while True: 428 line = self._readline() 429 if not line: 430 return 431 self.body.append(line) 432 if line == b".\r\n": 433 break 434 try: 435 meth, tokens = self.body_callback 436 meth(*tokens, body=self.body) 437 finally: 438 self.body_callback = None 439 self.body = None 440 self.in_body = False 441 while True: 442 line = self._decode(self._readline()) 443 if not line: 444 return 445 if not line.endswith("\r\n"): 446 raise ValueError("line doesn't end with \\r\\n: {!r}".format(line)) 447 line = line[:-2] 448 cmd, *tokens = line.split() 449 #meth = getattr(self.handler, "handle_" + cmd.upper(), None) 450 meth = getattr(self, "handle_" + cmd.upper(), None) 451 if meth is None: 452 self.handle_unknown() 453 else: 454 try: 455 meth(*tokens) 456 except Exception as e: 457 raise ValueError("command failed: {!r}".format(line)) from e 458 else: 459 if self.in_body: 460 self.body_callback = meth, tokens 461 self.body = [] 462 463 def expect_body(self): 464 """Flag that the client is expected to post a request body""" 465 self.in_body = True 466 467 def push_data(self, data): 468 """Push some binary data""" 469 self._push_data(data) 470 471 def push_lit(self, lit): 472 """Push a string literal""" 473 lit = textwrap.dedent(lit) 474 lit = "\r\n".join(lit.splitlines()) + "\r\n" 475 lit = lit.encode('utf-8') 476 self.push_data(lit) 477 478 def handle_unknown(self): 479 self.push_lit("500 What?") 480 481 def handle_welcome(self): 482 self.push_lit(self.welcome) 483 484 def handle_QUIT(self): 485 self.push_lit("205 Bye!") 486 487 def handle_DATE(self): 488 self.push_lit("111 20100914001155") 489 490 def handle_GROUP(self, group): 491 if group == "fr.comp.lang.python": 492 self.push_lit("211 486 761 1265 fr.comp.lang.python") 493 else: 494 self.push_lit("411 No such group {}".format(group)) 495 496 def handle_HELP(self): 497 self.push_lit("""\ 498 100 Legal commands 499 authinfo user Name|pass Password|generic <prog> <args> 500 date 501 help 502 Report problems to <root (at] example.org> 503 .""") 504 505 def handle_STAT(self, message_spec=None): 506 if message_spec is None: 507 self.push_lit("412 No newsgroup selected") 508 elif message_spec == "3000234": 509 self.push_lit("223 3000234 <45223423 (at] example.com>") 510 elif message_spec == "<45223423 (at] example.com>": 511 self.push_lit("223 0 <45223423 (at] example.com>") 512 else: 513 self.push_lit("430 No Such Article Found") 514 515 def handle_NEXT(self): 516 self.push_lit("223 3000237 <668929 (at] example.org> retrieved") 517 518 def handle_LAST(self): 519 self.push_lit("223 3000234 <45223423 (at] example.com> retrieved") 520 521 def handle_LIST(self, action=None, param=None): 522 if action is None: 523 self.push_lit("""\ 524 215 Newsgroups in form "group high low flags". 525 comp.lang.python 0000052340 0000002828 y 526 comp.lang.python.announce 0000001153 0000000993 m 527 free.it.comp.lang.python 0000000002 0000000002 y 528 fr.comp.lang.python 0000001254 0000000760 y 529 free.it.comp.lang.python.learner 0000000000 0000000001 y 530 tw.bbs.comp.lang.python 0000000304 0000000304 y 531 .""") 532 elif action == "ACTIVE": 533 if param == "*distutils*": 534 self.push_lit("""\ 535 215 Newsgroups in form "group high low flags" 536 gmane.comp.python.distutils.devel 0000014104 0000000001 m 537 gmane.comp.python.distutils.cvs 0000000000 0000000001 m 538 .""") 539 else: 540 self.push_lit("""\ 541 215 Newsgroups in form "group high low flags" 542 .""") 543 elif action == "OVERVIEW.FMT": 544 self.push_lit("""\ 545 215 Order of fields in overview database. 546 Subject: 547 From: 548 Date: 549 Message-ID: 550 References: 551 Bytes: 552 Lines: 553 Xref:full 554 .""") 555 elif action == "NEWSGROUPS": 556 assert param is not None 557 if param == "comp.lang.python": 558 self.push_lit("""\ 559 215 Descriptions in form "group description". 560 comp.lang.python\tThe Python computer language. 561 .""") 562 elif param == "comp.lang.python*": 563 self.push_lit("""\ 564 215 Descriptions in form "group description". 565 comp.lang.python.announce\tAnnouncements about the Python language. (Moderated) 566 comp.lang.python\tThe Python computer language. 567 .""") 568 else: 569 self.push_lit("""\ 570 215 Descriptions in form "group description". 571 .""") 572 else: 573 self.push_lit('501 Unknown LIST keyword') 574 575 def handle_NEWNEWS(self, group, date_str, time_str): 576 # We hard code different return messages depending on passed 577 # argument and date syntax. 578 if (group == "comp.lang.python" and date_str == "20100913" 579 and time_str == "082004"): 580 # Date was passed in RFC 3977 format (NNTP "v2") 581 self.push_lit("""\ 582 230 list of newsarticles (NNTP v2) created after Mon Sep 13 08:20:04 2010 follows 583 <a4929a40-6328-491a-aaaf-cb79ed7309a2 (at] q2g2000vbk.googlegroups.com> 584 <f30c0419-f549-4218-848f-d7d0131da931 (at] y3g2000vbm.googlegroups.com> 585 .""") 586 elif (group == "comp.lang.python" and date_str == "100913" 587 and time_str == "082004"): 588 # Date was passed in RFC 977 format (NNTP "v1") 589 self.push_lit("""\ 590 230 list of newsarticles (NNTP v1) created after Mon Sep 13 08:20:04 2010 follows 591 <a4929a40-6328-491a-aaaf-cb79ed7309a2 (at] q2g2000vbk.googlegroups.com> 592 <f30c0419-f549-4218-848f-d7d0131da931 (at] y3g2000vbm.googlegroups.com> 593 .""") 594 elif (group == 'comp.lang.python' and 595 date_str in ('20100101', '100101') and 596 time_str == '090000'): 597 self.push_lit('too long line' * 3000 + 598 '\n.') 599 else: 600 self.push_lit("""\ 601 230 An empty list of newsarticles follows 602 .""") 603 # (Note for experiments: many servers disable NEWNEWS. 604 # As of this writing, sicinfo3.epfl.ch doesn't.) 605 606 def handle_XOVER(self, message_spec): 607 if message_spec == "57-59": 608 self.push_lit( 609 "224 Overview information for 57-58 follows\n" 610 "57\tRe: ANN: New Plone book with strong Python (and Zope) themes throughout" 611 "\tDoug Hellmann <doug.hellmann-Re5JQEeQqe8AvxtiuMwx3w (at] public.gmane.org>" 612 "\tSat, 19 Jun 2010 18:04:08 -0400" 613 "\t<4FD05F05-F98B-44DC-8111-C6009C925F0C (at] gmail.com>" 614 "\t<hvalf7$ort$1 (at] dough.gmane.org>\t7103\t16" 615 "\tXref: news.gmane.org gmane.comp.python.authors:57" 616 "\n" 617 "58\tLooking for a few good bloggers" 618 "\tDoug Hellmann <doug.hellmann-Re5JQEeQqe8AvxtiuMwx3w (at] public.gmane.org>" 619 "\tThu, 22 Jul 2010 09:14:14 -0400" 620 "\t<A29863FA-F388-40C3-AA25-0FD06B09B5BF (at] gmail.com>" 621 "\t\t6683\t16" 622 "\t" 623 "\n" 624 # A UTF-8 overview line from fr.comp.lang.python 625 "59\tRe: Message d'erreur incomprhensible (par moi)" 626 "\tEric Brunel <eric.brunel (at] pragmadev.nospam.com>" 627 "\tWed, 15 Sep 2010 18:09:15 +0200" 628 "\t<eric.brunel-2B8B56.18091515092010 (at] news.wanadoo.fr>" 629 "\t<4c90ec87$0$32425$ba4acef3@reader.news.orange.fr>\t1641\t27" 630 "\tXref: saria.nerim.net fr.comp.lang.python:1265" 631 "\n" 632 ".\n") 633 else: 634 self.push_lit("""\ 635 224 No articles 636 .""") 637 638 def handle_POST(self, *, body=None): 639 if body is None: 640 if self.allow_posting: 641 self.push_lit("340 Input article; end with <CR-LF>.<CR-LF>") 642 self.expect_body() 643 else: 644 self.push_lit("440 Posting not permitted") 645 else: 646 assert self.allow_posting 647 self.push_lit("240 Article received OK") 648 self.posted_body = body 649 650 def handle_IHAVE(self, message_id, *, body=None): 651 if body is None: 652 if (self.allow_posting and 653 message_id == "<i.am.an.article.you.will.want (at] example.com>"): 654 self.push_lit("335 Send it; end with <CR-LF>.<CR-LF>") 655 self.expect_body() 656 else: 657 self.push_lit("435 Article not wanted") 658 else: 659 assert self.allow_posting 660 self.push_lit("235 Article transferred OK") 661 self.posted_body = body 662 663 sample_head = """\ 664 From: "Demo User" <nobody (at] example.net> 665 Subject: I am just a test article 666 Content-Type: text/plain; charset=UTF-8; format=flowed 667 Message-ID: <i.am.an.article.you.will.want (at] example.com>""" 668 669 sample_body = """\ 670 This is just a test article. 671 ..Here is a dot-starting line. 672 673 -- Signed by Andr\xe9.""" 674 675 sample_article = sample_head + "\n\n" + sample_body 676 677 def handle_ARTICLE(self, message_spec=None): 678 if message_spec is None: 679 self.push_lit("220 3000237 <45223423 (at] example.com>") 680 elif message_spec == "<45223423 (at] example.com>": 681 self.push_lit("220 0 <45223423 (at] example.com>") 682 elif message_spec == "3000234": 683 self.push_lit("220 3000234 <45223423 (at] example.com>") 684 else: 685 self.push_lit("430 No Such Article Found") 686 return 687 self.push_lit(self.sample_article) 688 self.push_lit(".") 689 690 def handle_HEAD(self, message_spec=None): 691 if message_spec is None: 692 self.push_lit("221 3000237 <45223423 (at] example.com>") 693 elif message_spec == "<45223423 (at] example.com>": 694 self.push_lit("221 0 <45223423 (at] example.com>") 695 elif message_spec == "3000234": 696 self.push_lit("221 3000234 <45223423 (at] example.com>") 697 else: 698 self.push_lit("430 No Such Article Found") 699 return 700 self.push_lit(self.sample_head) 701 self.push_lit(".") 702 703 def handle_BODY(self, message_spec=None): 704 if message_spec is None: 705 self.push_lit("222 3000237 <45223423 (at] example.com>") 706 elif message_spec == "<45223423 (at] example.com>": 707 self.push_lit("222 0 <45223423 (at] example.com>") 708 elif message_spec == "3000234": 709 self.push_lit("222 3000234 <45223423 (at] example.com>") 710 else: 711 self.push_lit("430 No Such Article Found") 712 return 713 self.push_lit(self.sample_body) 714 self.push_lit(".") 715 716 def handle_AUTHINFO(self, cred_type, data): 717 if self._logged_in: 718 self.push_lit('502 Already Logged In') 719 elif cred_type == 'user': 720 if self._user_sent: 721 self.push_lit('482 User Credential Already Sent') 722 else: 723 self.push_lit('381 Password Required') 724 self._user_sent = True 725 elif cred_type == 'pass': 726 self.push_lit('281 Login Successful') 727 self._logged_in = True 728 else: 729 raise Exception('Unknown cred type {}'.format(cred_type)) 730 731 732 class NNTPv2Handler(NNTPv1Handler): 733 """A handler for RFC 3977 (NNTP "v2")""" 734 735 def handle_CAPABILITIES(self): 736 fmt = """\ 737 101 Capability list: 738 VERSION 2 3 739 IMPLEMENTATION INN 2.5.1{} 740 HDR 741 LIST ACTIVE ACTIVE.TIMES DISTRIB.PATS HEADERS NEWSGROUPS OVERVIEW.FMT 742 OVER 743 POST 744 READER 745 .""" 746 747 if not self._logged_in: 748 self.push_lit(fmt.format('\n AUTHINFO USER')) 749 else: 750 self.push_lit(fmt.format('')) 751 752 def handle_MODE(self, _): 753 raise Exception('MODE READER sent despite READER has been advertised') 754 755 def handle_OVER(self, message_spec=None): 756 return self.handle_XOVER(message_spec) 757 758 759 class CapsAfterLoginNNTPv2Handler(NNTPv2Handler): 760 """A handler that allows CAPABILITIES only after login""" 761 762 def handle_CAPABILITIES(self): 763 if not self._logged_in: 764 self.push_lit('480 You must log in.') 765 else: 766 super().handle_CAPABILITIES() 767 768 769 class ModeSwitchingNNTPv2Handler(NNTPv2Handler): 770 """A server that starts in transit mode""" 771 772 def __init__(self): 773 self._switched = False 774 775 def handle_CAPABILITIES(self): 776 fmt = """\ 777 101 Capability list: 778 VERSION 2 3 779 IMPLEMENTATION INN 2.5.1 780 HDR 781 LIST ACTIVE ACTIVE.TIMES DISTRIB.PATS HEADERS NEWSGROUPS OVERVIEW.FMT 782 OVER 783 POST 784 {}READER 785 .""" 786 if self._switched: 787 self.push_lit(fmt.format('')) 788 else: 789 self.push_lit(fmt.format('MODE-')) 790 791 def handle_MODE(self, what): 792 assert not self._switched and what == 'reader' 793 self._switched = True 794 self.push_lit('200 Posting allowed') 795 796 797 class NNTPv1v2TestsMixin: 798 799 def setUp(self): 800 super().setUp() 801 802 def test_welcome(self): 803 self.assertEqual(self.server.welcome, self.handler.welcome) 804 805 def test_authinfo(self): 806 if self.nntp_version == 2: 807 self.assertIn('AUTHINFO', self.server._caps) 808 self.server.login('testuser', 'testpw') 809 # if AUTHINFO is gone from _caps we also know that getcapabilities() 810 # has been called after login as it should 811 self.assertNotIn('AUTHINFO', self.server._caps) 812 813 def test_date(self): 814 resp, date = self.server.date() 815 self.assertEqual(resp, "111 20100914001155") 816 self.assertEqual(date, datetime.datetime(2010, 9, 14, 0, 11, 55)) 817 818 def test_quit(self): 819 self.assertFalse(self.sio.closed) 820 resp = self.server.quit() 821 self.assertEqual(resp, "205 Bye!") 822 self.assertTrue(self.sio.closed) 823 824 def test_help(self): 825 resp, help = self.server.help() 826 self.assertEqual(resp, "100 Legal commands") 827 self.assertEqual(help, [ 828 ' authinfo user Name|pass Password|generic <prog> <args>', 829 ' date', 830 ' help', 831 'Report problems to <root (at] example.org>', 832 ]) 833 834 def test_list(self): 835 resp, groups = self.server.list() 836 self.assertEqual(len(groups), 6) 837 g = groups[1] 838 self.assertEqual(g, 839 GroupInfo("comp.lang.python.announce", "0000001153", 840 "0000000993", "m")) 841 resp, groups = self.server.list("*distutils*") 842 self.assertEqual(len(groups), 2) 843 g = groups[0] 844 self.assertEqual(g, 845 GroupInfo("gmane.comp.python.distutils.devel", "0000014104", 846 "0000000001", "m")) 847 848 def test_stat(self): 849 resp, art_num, message_id = self.server.stat(3000234) 850 self.assertEqual(resp, "223 3000234 <45223423 (at] example.com>") 851 self.assertEqual(art_num, 3000234) 852 self.assertEqual(message_id, "<45223423 (at] example.com>") 853 resp, art_num, message_id = self.server.stat("<45223423 (at] example.com>") 854 self.assertEqual(resp, "223 0 <45223423 (at] example.com>") 855 self.assertEqual(art_num, 0) 856 self.assertEqual(message_id, "<45223423 (at] example.com>") 857 with self.assertRaises(nntplib.NNTPTemporaryError) as cm: 858 self.server.stat("<non.existent.id>") 859 self.assertEqual(cm.exception.response, "430 No Such Article Found") 860 with self.assertRaises(nntplib.NNTPTemporaryError) as cm: 861 self.server.stat() 862 self.assertEqual(cm.exception.response, "412 No newsgroup selected") 863 864 def test_next(self): 865 resp, art_num, message_id = self.server.next() 866 self.assertEqual(resp, "223 3000237 <668929 (at] example.org> retrieved") 867 self.assertEqual(art_num, 3000237) 868 self.assertEqual(message_id, "<668929 (at] example.org>") 869 870 def test_last(self): 871 resp, art_num, message_id = self.server.last() 872 self.assertEqual(resp, "223 3000234 <45223423 (at] example.com> retrieved") 873 self.assertEqual(art_num, 3000234) 874 self.assertEqual(message_id, "<45223423 (at] example.com>") 875 876 def test_description(self): 877 desc = self.server.description("comp.lang.python") 878 self.assertEqual(desc, "The Python computer language.") 879 desc = self.server.description("comp.lang.pythonx") 880 self.assertEqual(desc, "") 881 882 def test_descriptions(self): 883 resp, groups = self.server.descriptions("comp.lang.python") 884 self.assertEqual(resp, '215 Descriptions in form "group description".') 885 self.assertEqual(groups, { 886 "comp.lang.python": "The Python computer language.", 887 }) 888 resp, groups = self.server.descriptions("comp.lang.python*") 889 self.assertEqual(groups, { 890 "comp.lang.python": "The Python computer language.", 891 "comp.lang.python.announce": "Announcements about the Python language. (Moderated)", 892 }) 893 resp, groups = self.server.descriptions("comp.lang.pythonx") 894 self.assertEqual(groups, {}) 895 896 def test_group(self): 897 resp, count, first, last, group = self.server.group("fr.comp.lang.python") 898 self.assertTrue(resp.startswith("211 "), resp) 899 self.assertEqual(first, 761) 900 self.assertEqual(last, 1265) 901 self.assertEqual(count, 486) 902 self.assertEqual(group, "fr.comp.lang.python") 903 with self.assertRaises(nntplib.NNTPTemporaryError) as cm: 904 self.server.group("comp.lang.python.devel") 905 exc = cm.exception 906 self.assertTrue(exc.response.startswith("411 No such group"), 907 exc.response) 908 909 def test_newnews(self): 910 # NEWNEWS comp.lang.python [20]100913 082004 911 dt = datetime.datetime(2010, 9, 13, 8, 20, 4) 912 resp, ids = self.server.newnews("comp.lang.python", dt) 913 expected = ( 914 "230 list of newsarticles (NNTP v{0}) " 915 "created after Mon Sep 13 08:20:04 2010 follows" 916 ).format(self.nntp_version) 917 self.assertEqual(resp, expected) 918 self.assertEqual(ids, [ 919 "<a4929a40-6328-491a-aaaf-cb79ed7309a2 (at] q2g2000vbk.googlegroups.com>", 920 "<f30c0419-f549-4218-848f-d7d0131da931 (at] y3g2000vbm.googlegroups.com>", 921 ]) 922 # NEWNEWS fr.comp.lang.python [20]100913 082004 923 dt = datetime.datetime(2010, 9, 13, 8, 20, 4) 924 resp, ids = self.server.newnews("fr.comp.lang.python", dt) 925 self.assertEqual(resp, "230 An empty list of newsarticles follows") 926 self.assertEqual(ids, []) 927 928 def _check_article_body(self, lines): 929 self.assertEqual(len(lines), 4) 930 self.assertEqual(lines[-1].decode('utf-8'), "-- Signed by Andr.") 931 self.assertEqual(lines[-2], b"") 932 self.assertEqual(lines[-3], b".Here is a dot-starting line.") 933 self.assertEqual(lines[-4], b"This is just a test article.") 934 935 def _check_article_head(self, lines): 936 self.assertEqual(len(lines), 4) 937 self.assertEqual(lines[0], b'From: "Demo User" <nobody (at] example.net>') 938 self.assertEqual(lines[3], b"Message-ID: <i.am.an.article.you.will.want (at] example.com>") 939 940 def _check_article_data(self, lines): 941 self.assertEqual(len(lines), 9) 942 self._check_article_head(lines[:4]) 943 self._check_article_body(lines[-4:]) 944 self.assertEqual(lines[4], b"") 945 946 def test_article(self): 947 # ARTICLE 948 resp, info = self.server.article() 949 self.assertEqual(resp, "220 3000237 <45223423 (at] example.com>") 950 art_num, message_id, lines = info 951 self.assertEqual(art_num, 3000237) 952 self.assertEqual(message_id, "<45223423 (at] example.com>") 953 self._check_article_data(lines) 954 # ARTICLE num 955 resp, info = self.server.article(3000234) 956 self.assertEqual(resp, "220 3000234 <45223423 (at] example.com>") 957 art_num, message_id, lines = info 958 self.assertEqual(art_num, 3000234) 959 self.assertEqual(message_id, "<45223423 (at] example.com>") 960 self._check_article_data(lines) 961 # ARTICLE id 962 resp, info = self.server.article("<45223423 (at] example.com>") 963 self.assertEqual(resp, "220 0 <45223423 (at] example.com>") 964 art_num, message_id, lines = info 965 self.assertEqual(art_num, 0) 966 self.assertEqual(message_id, "<45223423 (at] example.com>") 967 self._check_article_data(lines) 968 # Non-existent id 969 with self.assertRaises(nntplib.NNTPTemporaryError) as cm: 970 self.server.article("<non-existent (at] example.com>") 971 self.assertEqual(cm.exception.response, "430 No Such Article Found") 972 973 def test_article_file(self): 974 # With a "file" argument 975 f = io.BytesIO() 976 resp, info = self.server.article(file=f) 977 self.assertEqual(resp, "220 3000237 <45223423 (at] example.com>") 978 art_num, message_id, lines = info 979 self.assertEqual(art_num, 3000237) 980 self.assertEqual(message_id, "<45223423 (at] example.com>") 981 self.assertEqual(lines, []) 982 data = f.getvalue() 983 self.assertTrue(data.startswith( 984 b'From: "Demo User" <nobody (at] example.net>\r\n' 985 b'Subject: I am just a test article\r\n' 986 ), ascii(data)) 987 self.assertTrue(data.endswith( 988 b'This is just a test article.\r\n' 989 b'.Here is a dot-starting line.\r\n' 990 b'\r\n' 991 b'-- Signed by Andr\xc3\xa9.\r\n' 992 ), ascii(data)) 993 994 def test_head(self): 995 # HEAD 996 resp, info = self.server.head() 997 self.assertEqual(resp, "221 3000237 <45223423 (at] example.com>") 998 art_num, message_id, lines = info 999 self.assertEqual(art_num, 3000237) 1000 self.assertEqual(message_id, "<45223423 (at] example.com>") 1001 self._check_article_head(lines) 1002 # HEAD num 1003 resp, info = self.server.head(3000234) 1004 self.assertEqual(resp, "221 3000234 <45223423 (at] example.com>") 1005 art_num, message_id, lines = info 1006 self.assertEqual(art_num, 3000234) 1007 self.assertEqual(message_id, "<45223423 (at] example.com>") 1008 self._check_article_head(lines) 1009 # HEAD id 1010 resp, info = self.server.head("<45223423 (at] example.com>") 1011 self.assertEqual(resp, "221 0 <45223423 (at] example.com>") 1012 art_num, message_id, lines = info 1013 self.assertEqual(art_num, 0) 1014 self.assertEqual(message_id, "<45223423 (at] example.com>") 1015 self._check_article_head(lines) 1016 # Non-existent id 1017 with self.assertRaises(nntplib.NNTPTemporaryError) as cm: 1018 self.server.head("<non-existent (at] example.com>") 1019 self.assertEqual(cm.exception.response, "430 No Such Article Found") 1020 1021 def test_head_file(self): 1022 f = io.BytesIO() 1023 resp, info = self.server.head(file=f) 1024 self.assertEqual(resp, "221 3000237 <45223423 (at] example.com>") 1025 art_num, message_id, lines = info 1026 self.assertEqual(art_num, 3000237) 1027 self.assertEqual(message_id, "<45223423 (at] example.com>") 1028 self.assertEqual(lines, []) 1029 data = f.getvalue() 1030 self.assertTrue(data.startswith( 1031 b'From: "Demo User" <nobody (at] example.net>\r\n' 1032 b'Subject: I am just a test article\r\n' 1033 ), ascii(data)) 1034 self.assertFalse(data.endswith( 1035 b'This is just a test article.\r\n' 1036 b'.Here is a dot-starting line.\r\n' 1037 b'\r\n' 1038 b'-- Signed by Andr\xc3\xa9.\r\n' 1039 ), ascii(data)) 1040 1041 def test_body(self): 1042 # BODY 1043 resp, info = self.server.body() 1044 self.assertEqual(resp, "222 3000237 <45223423 (at] example.com>") 1045 art_num, message_id, lines = info 1046 self.assertEqual(art_num, 3000237) 1047 self.assertEqual(message_id, "<45223423 (at] example.com>") 1048 self._check_article_body(lines) 1049 # BODY num 1050 resp, info = self.server.body(3000234) 1051 self.assertEqual(resp, "222 3000234 <45223423 (at] example.com>") 1052 art_num, message_id, lines = info 1053 self.assertEqual(art_num, 3000234) 1054 self.assertEqual(message_id, "<45223423 (at] example.com>") 1055 self._check_article_body(lines) 1056 # BODY id 1057 resp, info = self.server.body("<45223423 (at] example.com>") 1058 self.assertEqual(resp, "222 0 <45223423 (at] example.com>") 1059 art_num, message_id, lines = info 1060 self.assertEqual(art_num, 0) 1061 self.assertEqual(message_id, "<45223423 (at] example.com>") 1062 self._check_article_body(lines) 1063 # Non-existent id 1064 with self.assertRaises(nntplib.NNTPTemporaryError) as cm: 1065 self.server.body("<non-existent (at] example.com>") 1066 self.assertEqual(cm.exception.response, "430 No Such Article Found") 1067 1068 def test_body_file(self): 1069 f = io.BytesIO() 1070 resp, info = self.server.body(file=f) 1071 self.assertEqual(resp, "222 3000237 <45223423 (at] example.com>") 1072 art_num, message_id, lines = info 1073 self.assertEqual(art_num, 3000237) 1074 self.assertEqual(message_id, "<45223423 (at] example.com>") 1075 self.assertEqual(lines, []) 1076 data = f.getvalue() 1077 self.assertFalse(data.startswith( 1078 b'From: "Demo User" <nobody (at] example.net>\r\n' 1079 b'Subject: I am just a test article\r\n' 1080 ), ascii(data)) 1081 self.assertTrue(data.endswith( 1082 b'This is just a test article.\r\n' 1083 b'.Here is a dot-starting line.\r\n' 1084 b'\r\n' 1085 b'-- Signed by Andr\xc3\xa9.\r\n' 1086 ), ascii(data)) 1087 1088 def check_over_xover_resp(self, resp, overviews): 1089 self.assertTrue(resp.startswith("224 "), resp) 1090 self.assertEqual(len(overviews), 3) 1091 art_num, over = overviews[0] 1092 self.assertEqual(art_num, 57) 1093 self.assertEqual(over, { 1094 "from": "Doug Hellmann <doug.hellmann-Re5JQEeQqe8AvxtiuMwx3w (at] public.gmane.org>", 1095 "subject": "Re: ANN: New Plone book with strong Python (and Zope) themes throughout", 1096 "date": "Sat, 19 Jun 2010 18:04:08 -0400", 1097 "message-id": "<4FD05F05-F98B-44DC-8111-C6009C925F0C (at] gmail.com>", 1098 "references": "<hvalf7$ort$1 (at] dough.gmane.org>", 1099 ":bytes": "7103", 1100 ":lines": "16", 1101 "xref": "news.gmane.org gmane.comp.python.authors:57" 1102 }) 1103 art_num, over = overviews[1] 1104 self.assertEqual(over["xref"], None) 1105 art_num, over = overviews[2] 1106 self.assertEqual(over["subject"], 1107 "Re: Message d'erreur incomprhensible (par moi)") 1108 1109 def test_xover(self): 1110 resp, overviews = self.server.xover(57, 59) 1111 self.check_over_xover_resp(resp, overviews) 1112 1113 def test_over(self): 1114 # In NNTP "v1", this will fallback on XOVER 1115 resp, overviews = self.server.over((57, 59)) 1116 self.check_over_xover_resp(resp, overviews) 1117 1118 sample_post = ( 1119 b'From: "Demo User" <nobody (at] example.net>\r\n' 1120 b'Subject: I am just a test article\r\n' 1121 b'Content-Type: text/plain; charset=UTF-8; format=flowed\r\n' 1122 b'Message-ID: <i.am.an.article.you.will.want (at] example.com>\r\n' 1123 b'\r\n' 1124 b'This is just a test article.\r\n' 1125 b'.Here is a dot-starting line.\r\n' 1126 b'\r\n' 1127 b'-- Signed by Andr\xc3\xa9.\r\n' 1128 ) 1129 1130 def _check_posted_body(self): 1131 # Check the raw body as received by the server 1132 lines = self.handler.posted_body 1133 # One additional line for the "." terminator 1134 self.assertEqual(len(lines), 10) 1135 self.assertEqual(lines[-1], b'.\r\n') 1136 self.assertEqual(lines[-2], b'-- Signed by Andr\xc3\xa9.\r\n') 1137 self.assertEqual(lines[-3], b'\r\n') 1138 self.assertEqual(lines[-4], b'..Here is a dot-starting line.\r\n') 1139 self.assertEqual(lines[0], b'From: "Demo User" <nobody (at] example.net>\r\n') 1140 1141 def _check_post_ihave_sub(self, func, *args, file_factory): 1142 # First the prepared post with CRLF endings 1143 post = self.sample_post 1144 func_args = args + (file_factory(post),) 1145 self.handler.posted_body = None 1146 resp = func(*func_args) 1147 self._check_posted_body() 1148 # Then the same post with "normal" line endings - they should be 1149 # converted by NNTP.post and NNTP.ihave. 1150 post = self.sample_post.replace(b"\r\n", b"\n") 1151 func_args = args + (file_factory(post),) 1152 self.handler.posted_body = None 1153 resp = func(*func_args) 1154 self._check_posted_body() 1155 return resp 1156 1157 def check_post_ihave(self, func, success_resp, *args): 1158 # With a bytes object 1159 resp = self._check_post_ihave_sub(func, *args, file_factory=bytes) 1160 self.assertEqual(resp, success_resp) 1161 # With a bytearray object 1162 resp = self._check_post_ihave_sub(func, *args, file_factory=bytearray) 1163 self.assertEqual(resp, success_resp) 1164 # With a file object 1165 resp = self._check_post_ihave_sub(func, *args, file_factory=io.BytesIO) 1166 self.assertEqual(resp, success_resp) 1167 # With an iterable of terminated lines 1168 def iterlines(b): 1169 return iter(b.splitlines(keepends=True)) 1170 resp = self._check_post_ihave_sub(func, *args, file_factory=iterlines) 1171 self.assertEqual(resp, success_resp) 1172 # With an iterable of non-terminated lines 1173 def iterlines(b): 1174 return iter(b.splitlines(keepends=False)) 1175 resp = self._check_post_ihave_sub(func, *args, file_factory=iterlines) 1176 self.assertEqual(resp, success_resp) 1177 1178 def test_post(self): 1179 self.check_post_ihave(self.server.post, "240 Article received OK") 1180 self.handler.allow_posting = False 1181 with self.assertRaises(nntplib.NNTPTemporaryError) as cm: 1182 self.server.post(self.sample_post) 1183 self.assertEqual(cm.exception.response, 1184 "440 Posting not permitted") 1185 1186 def test_ihave(self): 1187 self.check_post_ihave(self.server.ihave, "235 Article transferred OK", 1188 "<i.am.an.article.you.will.want (at] example.com>") 1189 with self.assertRaises(nntplib.NNTPTemporaryError) as cm: 1190 self.server.ihave("<another.message.id>", self.sample_post) 1191 self.assertEqual(cm.exception.response, 1192 "435 Article not wanted") 1193 1194 def test_too_long_lines(self): 1195 dt = datetime.datetime(2010, 1, 1, 9, 0, 0) 1196 self.assertRaises(nntplib.NNTPDataError, 1197 self.server.newnews, "comp.lang.python", dt) 1198 1199 1200 class NNTPv1Tests(NNTPv1v2TestsMixin, MockedNNTPTestsMixin, unittest.TestCase): 1201 """Tests an NNTP v1 server (no capabilities).""" 1202 1203 nntp_version = 1 1204 handler_class = NNTPv1Handler 1205 1206 def test_caps(self): 1207 caps = self.server.getcapabilities() 1208 self.assertEqual(caps, {}) 1209 self.assertEqual(self.server.nntp_version, 1) 1210 self.assertEqual(self.server.nntp_implementation, None) 1211 1212 1213 class NNTPv2Tests(NNTPv1v2TestsMixin, MockedNNTPTestsMixin, unittest.TestCase): 1214 """Tests an NNTP v2 server (with capabilities).""" 1215 1216 nntp_version = 2 1217 handler_class = NNTPv2Handler 1218 1219 def test_caps(self): 1220 caps = self.server.getcapabilities() 1221 self.assertEqual(caps, { 1222 'VERSION': ['2', '3'], 1223 'IMPLEMENTATION': ['INN', '2.5.1'], 1224 'AUTHINFO': ['USER'], 1225 'HDR': [], 1226 'LIST': ['ACTIVE', 'ACTIVE.TIMES', 'DISTRIB.PATS', 1227 'HEADERS', 'NEWSGROUPS', 'OVERVIEW.FMT'], 1228 'OVER': [], 1229 'POST': [], 1230 'READER': [], 1231 }) 1232 self.assertEqual(self.server.nntp_version, 3) 1233 self.assertEqual(self.server.nntp_implementation, 'INN 2.5.1') 1234 1235 1236 class CapsAfterLoginNNTPv2Tests(MockedNNTPTestsMixin, unittest.TestCase): 1237 """Tests a probably NNTP v2 server with capabilities only after login.""" 1238 1239 nntp_version = 2 1240 handler_class = CapsAfterLoginNNTPv2Handler 1241 1242 def test_caps_only_after_login(self): 1243 self.assertEqual(self.server._caps, {}) 1244 self.server.login('testuser', 'testpw') 1245 self.assertIn('VERSION', self.server._caps) 1246 1247 1248 class SendReaderNNTPv2Tests(MockedNNTPWithReaderModeMixin, 1249 unittest.TestCase): 1250 """Same tests as for v2 but we tell NTTP to send MODE READER to a server 1251 that isn't in READER mode by default.""" 1252 1253 nntp_version = 2 1254 handler_class = ModeSwitchingNNTPv2Handler 1255 1256 def test_we_are_in_reader_mode_after_connect(self): 1257 self.assertIn('READER', self.server._caps) 1258 1259 1260 class MiscTests(unittest.TestCase): 1261 1262 def test_decode_header(self): 1263 def gives(a, b): 1264 self.assertEqual(nntplib.decode_header(a), b) 1265 gives("" , "") 1266 gives("a plain header", "a plain header") 1267 gives(" with extra spaces ", " with extra spaces ") 1268 gives("=?ISO-8859-15?Q?D=E9buter_en_Python?=", "Dbuter en Python") 1269 gives("=?utf-8?q?Re=3A_=5Bsqlite=5D_probl=C3=A8me_avec_ORDER_BY_sur_des_cha?=" 1270 " =?utf-8?q?=C3=AEnes_de_caract=C3=A8res_accentu=C3=A9es?=", 1271 "Re: [sqlite] problme avec ORDER BY sur des chanes de caractres accentues") 1272 gives("Re: =?UTF-8?B?cHJvYmzDqG1lIGRlIG1hdHJpY2U=?=", 1273 "Re: problme de matrice") 1274 # A natively utf-8 header (found in the real world!) 1275 gives("Re: Message d'erreur incomprhensible (par moi)", 1276 "Re: Message d'erreur incomprhensible (par moi)") 1277 1278 def test_parse_overview_fmt(self): 1279 # The minimal (default) response 1280 lines = ["Subject:", "From:", "Date:", "Message-ID:", 1281 "References:", ":bytes", ":lines"] 1282 self.assertEqual(nntplib._parse_overview_fmt(lines), 1283 ["subject", "from", "date", "message-id", "references", 1284 ":bytes", ":lines"]) 1285 # The minimal response using alternative names 1286 lines = ["Subject:", "From:", "Date:", "Message-ID:", 1287 "References:", "Bytes:", "Lines:"] 1288 self.assertEqual(nntplib._parse_overview_fmt(lines), 1289 ["subject", "from", "date", "message-id", "references", 1290 ":bytes", ":lines"]) 1291 # Variations in casing 1292 lines = ["subject:", "FROM:", "DaTe:", "message-ID:", 1293 "References:", "BYTES:", "Lines:"] 1294 self.assertEqual(nntplib._parse_overview_fmt(lines), 1295 ["subject", "from", "date", "message-id", "references", 1296 ":bytes", ":lines"]) 1297 # First example from RFC 3977 1298 lines = ["Subject:", "From:", "Date:", "Message-ID:", 1299 "References:", ":bytes", ":lines", "Xref:full", 1300 "Distribution:full"] 1301 self.assertEqual(nntplib._parse_overview_fmt(lines), 1302 ["subject", "from", "date", "message-id", "references", 1303 ":bytes", ":lines", "xref", "distribution"]) 1304 # Second example from RFC 3977 1305 lines = ["Subject:", "From:", "Date:", "Message-ID:", 1306 "References:", "Bytes:", "Lines:", "Xref:FULL", 1307 "Distribution:FULL"] 1308 self.assertEqual(nntplib._parse_overview_fmt(lines), 1309 ["subject", "from", "date", "message-id", "references", 1310 ":bytes", ":lines", "xref", "distribution"]) 1311 # A classic response from INN 1312 lines = ["Subject:", "From:", "Date:", "Message-ID:", 1313 "References:", "Bytes:", "Lines:", "Xref:full"] 1314 self.assertEqual(nntplib._parse_overview_fmt(lines), 1315 ["subject", "from", "date", "message-id", "references", 1316 ":bytes", ":lines", "xref"]) 1317 1318 def test_parse_overview(self): 1319 fmt = nntplib._DEFAULT_OVERVIEW_FMT + ["xref"] 1320 # First example from RFC 3977 1321 lines = [ 1322 '3000234\tI am just a test article\t"Demo User" ' 1323 '<nobody (at] example.com>\t6 Oct 1998 04:38:40 -0500\t' 1324 '<45223423 (at] example.com>\t<45454 (at] example.net>\t1234\t' 1325 '17\tXref: news.example.com misc.test:3000363', 1326 ] 1327 overview = nntplib._parse_overview(lines, fmt) 1328 (art_num, fields), = overview 1329 self.assertEqual(art_num, 3000234) 1330 self.assertEqual(fields, { 1331 'subject': 'I am just a test article', 1332 'from': '"Demo User" <nobody (at] example.com>', 1333 'date': '6 Oct 1998 04:38:40 -0500', 1334 'message-id': '<45223423 (at] example.com>', 1335 'references': '<45454 (at] example.net>', 1336 ':bytes': '1234', 1337 ':lines': '17', 1338 'xref': 'news.example.com misc.test:3000363', 1339 }) 1340 # Second example; here the "Xref" field is totally absent (including 1341 # the header name) and comes out as None 1342 lines = [ 1343 '3000234\tI am just a test article\t"Demo User" ' 1344 '<nobody (at] example.com>\t6 Oct 1998 04:38:40 -0500\t' 1345 '<45223423 (at] example.com>\t<45454 (at] example.net>\t1234\t' 1346 '17\t\t', 1347 ] 1348 overview = nntplib._parse_overview(lines, fmt) 1349 (art_num, fields), = overview 1350 self.assertEqual(fields['xref'], None) 1351 # Third example; the "Xref" is an empty string, while "references" 1352 # is a single space. 1353 lines = [ 1354 '3000234\tI am just a test article\t"Demo User" ' 1355 '<nobody (at] example.com>\t6 Oct 1998 04:38:40 -0500\t' 1356 '<45223423 (at] example.com>\t \t1234\t' 1357 '17\tXref: \t', 1358 ] 1359 overview = nntplib._parse_overview(lines, fmt) 1360 (art_num, fields), = overview 1361 self.assertEqual(fields['references'], ' ') 1362 self.assertEqual(fields['xref'], '') 1363 1364 def test_parse_datetime(self): 1365 def gives(a, b, *c): 1366 self.assertEqual(nntplib._parse_datetime(a, b), 1367 datetime.datetime(*c)) 1368 # Output of DATE command 1369 gives("19990623135624", None, 1999, 6, 23, 13, 56, 24) 1370 # Variations 1371 gives("19990623", "135624", 1999, 6, 23, 13, 56, 24) 1372 gives("990623", "135624", 1999, 6, 23, 13, 56, 24) 1373 gives("090623", "135624", 2009, 6, 23, 13, 56, 24) 1374 1375 def test_unparse_datetime(self): 1376 # Test non-legacy mode 1377 # 1) with a datetime 1378 def gives(y, M, d, h, m, s, date_str, time_str): 1379 dt = datetime.datetime(y, M, d, h, m, s) 1380 self.assertEqual(nntplib._unparse_datetime(dt), 1381 (date_str, time_str)) 1382 self.assertEqual(nntplib._unparse_datetime(dt, False), 1383 (date_str, time_str)) 1384 gives(1999, 6, 23, 13, 56, 24, "19990623", "135624") 1385 gives(2000, 6, 23, 13, 56, 24, "20000623", "135624") 1386 gives(2010, 6, 5, 1, 2, 3, "20100605", "010203") 1387 # 2) with a date 1388 def gives(y, M, d, date_str, time_str): 1389 dt = datetime.date(y, M, d) 1390 self.assertEqual(nntplib._unparse_datetime(dt), 1391 (date_str, time_str)) 1392 self.assertEqual(nntplib._unparse_datetime(dt, False), 1393 (date_str, time_str)) 1394 gives(1999, 6, 23, "19990623", "000000") 1395 gives(2000, 6, 23, "20000623", "000000") 1396 gives(2010, 6, 5, "20100605", "000000") 1397 1398 def test_unparse_datetime_legacy(self): 1399 # Test legacy mode (RFC 977) 1400 # 1) with a datetime 1401 def gives(y, M, d, h, m, s, date_str, time_str): 1402 dt = datetime.datetime(y, M, d, h, m, s) 1403 self.assertEqual(nntplib._unparse_datetime(dt, True), 1404 (date_str, time_str)) 1405 gives(1999, 6, 23, 13, 56, 24, "990623", "135624") 1406 gives(2000, 6, 23, 13, 56, 24, "000623", "135624") 1407 gives(2010, 6, 5, 1, 2, 3, "100605", "010203") 1408 # 2) with a date 1409 def gives(y, M, d, date_str, time_str): 1410 dt = datetime.date(y, M, d) 1411 self.assertEqual(nntplib._unparse_datetime(dt, True), 1412 (date_str, time_str)) 1413 gives(1999, 6, 23, "990623", "000000") 1414 gives(2000, 6, 23, "000623", "000000") 1415 gives(2010, 6, 5, "100605", "000000") 1416 1417 @unittest.skipUnless(ssl, 'requires SSL support') 1418 def test_ssl_support(self): 1419 self.assertTrue(hasattr(nntplib, 'NNTP_SSL')) 1420 1421 1422 class PublicAPITests(unittest.TestCase): 1423 """Ensures that the correct values are exposed in the public API.""" 1424 1425 def test_module_all_attribute(self): 1426 self.assertTrue(hasattr(nntplib, '__all__')) 1427 target_api = ['NNTP', 'NNTPError', 'NNTPReplyError', 1428 'NNTPTemporaryError', 'NNTPPermanentError', 1429 'NNTPProtocolError', 'NNTPDataError', 'decode_header'] 1430 if ssl is not None: 1431 target_api.append('NNTP_SSL') 1432 self.assertEqual(set(nntplib.__all__), set(target_api)) 1433 1434 class MockSocketTests(unittest.TestCase): 1435 """Tests involving a mock socket object 1436 1437 Used where the _NNTPServerIO file object is not enough.""" 1438 1439 nntp_class = nntplib.NNTP 1440 1441 def check_constructor_error_conditions( 1442 self, handler_class, 1443 expected_error_type, expected_error_msg, 1444 login=None, password=None): 1445 1446 class mock_socket_module: 1447 def create_connection(address, timeout): 1448 return MockSocket() 1449 1450 class MockSocket: 1451 def close(self): 1452 nonlocal socket_closed 1453 socket_closed = True 1454 1455 def makefile(socket, mode): 1456 handler = handler_class() 1457 _, file = make_mock_file(handler) 1458 files.append(file) 1459 return file 1460 1461 socket_closed = False 1462 files = [] 1463 with patch('nntplib.socket', mock_socket_module), \ 1464 self.assertRaisesRegex(expected_error_type, expected_error_msg): 1465 self.nntp_class('dummy', user=login, password=password) 1466 self.assertTrue(socket_closed) 1467 for f in files: 1468 self.assertTrue(f.closed) 1469 1470 def test_bad_welcome(self): 1471 #Test a bad welcome message 1472 class Handler(NNTPv1Handler): 1473 welcome = 'Bad Welcome' 1474 self.check_constructor_error_conditions( 1475 Handler, nntplib.NNTPProtocolError, Handler.welcome) 1476 1477 def test_service_temporarily_unavailable(self): 1478 #Test service temporarily unavailable 1479 class Handler(NNTPv1Handler): 1480 welcome = '400 Service temporarily unavailable' 1481 self.check_constructor_error_conditions( 1482 Handler, nntplib.NNTPTemporaryError, Handler.welcome) 1483 1484 def test_service_permanently_unavailable(self): 1485 #Test service permanently unavailable 1486 class Handler(NNTPv1Handler): 1487 welcome = '502 Service permanently unavailable' 1488 self.check_constructor_error_conditions( 1489 Handler, nntplib.NNTPPermanentError, Handler.welcome) 1490 1491 def test_bad_capabilities(self): 1492 #Test a bad capabilities response 1493 class Handler(NNTPv1Handler): 1494 def handle_CAPABILITIES(self): 1495 self.push_lit(capabilities_response) 1496 capabilities_response = '201 bad capability' 1497 self.check_constructor_error_conditions( 1498 Handler, nntplib.NNTPReplyError, capabilities_response) 1499 1500 def test_login_aborted(self): 1501 #Test a bad authinfo response 1502 login = 't (at] e.com' 1503 password = 'python' 1504 class Handler(NNTPv1Handler): 1505 def handle_AUTHINFO(self, *args): 1506 self.push_lit(authinfo_response) 1507 authinfo_response = '503 Mechanism not recognized' 1508 self.check_constructor_error_conditions( 1509 Handler, nntplib.NNTPPermanentError, authinfo_response, 1510 login, password) 1511 1512 class bypass_context: 1513 """Bypass encryption and actual SSL module""" 1514 def wrap_socket(sock, **args): 1515 return sock 1516 1517 @unittest.skipUnless(ssl, 'requires SSL support') 1518 class MockSslTests(MockSocketTests): 1519 @staticmethod 1520 def nntp_class(*pos, **kw): 1521 return nntplib.NNTP_SSL(*pos, ssl_context=bypass_context, **kw) 1522 1523 1524 class LocalServerTests(unittest.TestCase): 1525 def setUp(self): 1526 sock = socket.socket() 1527 port = support.bind_port(sock) 1528 sock.listen() 1529 self.background = threading.Thread( 1530 target=self.run_server, args=(sock,)) 1531 self.background.start() 1532 self.addCleanup(self.background.join) 1533 1534 self.nntp = NNTP(support.HOST, port, usenetrc=False).__enter__() 1535 self.addCleanup(self.nntp.__exit__, None, None, None) 1536 1537 def run_server(self, sock): 1538 # Could be generalized to handle more commands in separate methods 1539 with sock: 1540 [client, _] = sock.accept() 1541 with contextlib.ExitStack() as cleanup: 1542 cleanup.enter_context(client) 1543 reader = cleanup.enter_context(client.makefile('rb')) 1544 client.sendall(b'200 Server ready\r\n') 1545 while True: 1546 cmd = reader.readline() 1547 if cmd == b'CAPABILITIES\r\n': 1548 client.sendall( 1549 b'101 Capability list:\r\n' 1550 b'VERSION 2\r\n' 1551 b'STARTTLS\r\n' 1552 b'.\r\n' 1553 ) 1554 elif cmd == b'STARTTLS\r\n': 1555 reader.close() 1556 client.sendall(b'382 Begin TLS negotiation now\r\n') 1557 context = ssl.SSLContext() 1558 context.load_cert_chain(certfile) 1559 client = context.wrap_socket( 1560 client, server_side=True) 1561 cleanup.enter_context(client) 1562 reader = cleanup.enter_context(client.makefile('rb')) 1563 elif cmd == b'QUIT\r\n': 1564 client.sendall(b'205 Bye!\r\n') 1565 break 1566 else: 1567 raise ValueError('Unexpected command {!r}'.format(cmd)) 1568 1569 @unittest.skipUnless(ssl, 'requires SSL support') 1570 def test_starttls(self): 1571 file = self.nntp.file 1572 sock = self.nntp.sock 1573 self.nntp.starttls() 1574 # Check that the socket and internal pseudo-file really were 1575 # changed. 1576 self.assertNotEqual(file, self.nntp.file) 1577 self.assertNotEqual(sock, self.nntp.sock) 1578 # Check that the new socket really is an SSL one 1579 self.assertIsInstance(self.nntp.sock, ssl.SSLSocket) 1580 # Check that trying starttls when it's already active fails. 1581 self.assertRaises(ValueError, self.nntp.starttls) 1582 1583 1584 if __name__ == "__main__": 1585 unittest.main() 1586