Home | History | Annotate | Download | only in test
      1 import io
      2 import socket
      3 import datetime
      4 import textwrap
      5 import unittest
      6 import functools
      7 import contextlib
      8 import os.path
      9 from test import support
     10 from nntplib import NNTP, GroupInfo
     11 import nntplib
     12 from unittest.mock import patch
     13 try:
     14     import ssl
     15 except ImportError:
     16     ssl = None
     17 try:
     18     import threading
     19 except ImportError:
     20     threading = None
     21 
     22 TIMEOUT = 30
     23 certfile = os.path.join(os.path.dirname(__file__), 'keycert3.pem')
     24 
     25 # TODO:
     26 # - test the `file` arg to more commands
     27 # - test error conditions
     28 # - test auth and `usenetrc`
     29 
     30 
     31 class NetworkedNNTPTestsMixin:
     32 
     33     def test_welcome(self):
     34         welcome = self.server.getwelcome()
     35         self.assertEqual(str, type(welcome))
     36 
     37     def test_help(self):
     38         resp, lines = self.server.help()
     39         self.assertTrue(resp.startswith("100 "), resp)
     40         for line in lines:
     41             self.assertEqual(str, type(line))
     42 
     43     def test_list(self):
     44         resp, groups = self.server.list()
     45         if len(groups) > 0:
     46             self.assertEqual(GroupInfo, type(groups[0]))
     47             self.assertEqual(str, type(groups[0].group))
     48 
     49     def test_list_active(self):
     50         resp, groups = self.server.list(self.GROUP_PAT)
     51         if len(groups) > 0:
     52             self.assertEqual(GroupInfo, type(groups[0]))
     53             self.assertEqual(str, type(groups[0].group))
     54 
     55     def test_unknown_command(self):
     56         with self.assertRaises(nntplib.NNTPPermanentError) as cm:
     57             self.server._shortcmd("XYZZY")
     58         resp = cm.exception.response
     59         self.assertTrue(resp.startswith("500 "), resp)
     60 
     61     def test_newgroups(self):
     62         # gmane gets a constant influx of new groups.  In order not to stress
     63         # the server too much, we choose a recent date in the past.
     64         dt = datetime.date.today() - datetime.timedelta(days=7)
     65         resp, groups = self.server.newgroups(dt)
     66         if len(groups) > 0:
     67             self.assertIsInstance(groups[0], GroupInfo)
     68             self.assertIsInstance(groups[0].group, str)
     69 
     70     def test_description(self):
     71         def _check_desc(desc):
     72             # Sanity checks
     73             self.assertIsInstance(desc, str)
     74             self.assertNotIn(self.GROUP_NAME, desc)
     75         desc = self.server.description(self.GROUP_NAME)
     76         _check_desc(desc)
     77         # Another sanity check
     78         self.assertIn("Python", desc)
     79         # With a pattern
     80         desc = self.server.description(self.GROUP_PAT)
     81         _check_desc(desc)
     82         # Shouldn't exist
     83         desc = self.server.description("zk.brrtt.baz")
     84         self.assertEqual(desc, '')
     85 
     86     def test_descriptions(self):
     87         resp, descs = self.server.descriptions(self.GROUP_PAT)
     88         # 215 for LIST NEWSGROUPS, 282 for XGTITLE
     89         self.assertTrue(
     90             resp.startswith("215 ") or resp.startswith("282 "), resp)
     91         self.assertIsInstance(descs, dict)
     92         desc = descs[self.GROUP_NAME]
     93         self.assertEqual(desc, self.server.description(self.GROUP_NAME))
     94 
     95     def test_group(self):
     96         result = self.server.group(self.GROUP_NAME)
     97         self.assertEqual(5, len(result))
     98         resp, count, first, last, group = result
     99         self.assertEqual(group, self.GROUP_NAME)
    100         self.assertIsInstance(count, int)
    101         self.assertIsInstance(first, int)
    102         self.assertIsInstance(last, int)
    103         self.assertLessEqual(first, last)
    104         self.assertTrue(resp.startswith("211 "), resp)
    105 
    106     def test_date(self):
    107         resp, date = self.server.date()
    108         self.assertIsInstance(date, datetime.datetime)
    109         # Sanity check
    110         self.assertGreaterEqual(date.year, 1995)
    111         self.assertLessEqual(date.year, 2030)
    112 
    113     def _check_art_dict(self, art_dict):
    114         # Some sanity checks for a field dictionary returned by OVER / XOVER
    115         self.assertIsInstance(art_dict, dict)
    116         # NNTP has 7 mandatory fields
    117         self.assertGreaterEqual(art_dict.keys(),
    118             {"subject", "from", "date", "message-id",
    119              "references", ":bytes", ":lines"}
    120             )
    121         for v in art_dict.values():
    122             self.assertIsInstance(v, (str, type(None)))
    123 
    124     def test_xover(self):
    125         resp, count, first, last, name = self.server.group(self.GROUP_NAME)
    126         resp, lines = self.server.xover(last - 5, last)
    127         if len(lines) == 0:
    128             self.skipTest("no articles retrieved")
    129         # The 'last' article is not necessarily part of the output (cancelled?)
    130         art_num, art_dict = lines[0]
    131         self.assertGreaterEqual(art_num, last - 5)
    132         self.assertLessEqual(art_num, last)
    133         self._check_art_dict(art_dict)
    134 
    135     @unittest.skipIf(True, 'temporarily skipped until a permanent solution'
    136                            ' is found for issue #28971')
    137     def test_over(self):
    138         resp, count, first, last, name = self.server.group(self.GROUP_NAME)
    139         start = last - 10
    140         # The "start-" article range form
    141         resp, lines = self.server.over((start, None))
    142         art_num, art_dict = lines[0]
    143         self._check_art_dict(art_dict)
    144         # The "start-end" article range form
    145         resp, lines = self.server.over((start, last))
    146         art_num, art_dict = lines[-1]
    147         # The 'last' article is not necessarily part of the output (cancelled?)
    148         self.assertGreaterEqual(art_num, start)
    149         self.assertLessEqual(art_num, last)
    150         self._check_art_dict(art_dict)
    151         # XXX The "message_id" form is unsupported by gmane
    152         # 503 Overview by message-ID unsupported
    153 
    154     def test_xhdr(self):
    155         resp, count, first, last, name = self.server.group(self.GROUP_NAME)
    156         resp, lines = self.server.xhdr('subject', last)
    157         for line in lines:
    158             self.assertEqual(str, type(line[1]))
    159 
    160     def check_article_resp(self, resp, article, art_num=None):
    161         self.assertIsInstance(article, nntplib.ArticleInfo)
    162         if art_num is not None:
    163             self.assertEqual(article.number, art_num)
    164         for line in article.lines:
    165             self.assertIsInstance(line, bytes)
    166         # XXX this could exceptionally happen...
    167         self.assertNotIn(article.lines[-1], (b".", b".\n", b".\r\n"))
    168 
    169     def test_article_head_body(self):
    170         resp, count, first, last, name = self.server.group(self.GROUP_NAME)
    171         # Try to find an available article
    172         for art_num in (last, first, last - 1):
    173             try:
    174                 resp, head = self.server.head(art_num)
    175             except nntplib.NNTPTemporaryError as e:
    176                 if not e.response.startswith("423 "):
    177                     raise
    178                 # "423 No such article" => choose another one
    179                 continue
    180             break
    181         else:
    182             self.skipTest("could not find a suitable article number")
    183         self.assertTrue(resp.startswith("221 "), resp)
    184         self.check_article_resp(resp, head, art_num)
    185         resp, body = self.server.body(art_num)
    186         self.assertTrue(resp.startswith("222 "), resp)
    187         self.check_article_resp(resp, body, art_num)
    188         resp, article = self.server.article(art_num)
    189         self.assertTrue(resp.startswith("220 "), resp)
    190         self.check_article_resp(resp, article, art_num)
    191         # Tolerate running the tests from behind a NNTP virus checker
    192         blacklist = lambda line: line.startswith(b'X-Antivirus')
    193         filtered_head_lines = [line for line in head.lines
    194                                if not blacklist(line)]
    195         filtered_lines = [line for line in article.lines
    196                           if not blacklist(line)]
    197         self.assertEqual(filtered_lines, filtered_head_lines + [b''] + body.lines)
    198 
    199     def test_capabilities(self):
    200         # The server under test implements NNTP version 2 and has a
    201         # couple of well-known capabilities. Just sanity check that we
    202         # got them.
    203         def _check_caps(caps):
    204             caps_list = caps['LIST']
    205             self.assertIsInstance(caps_list, (list, tuple))
    206             self.assertIn('OVERVIEW.FMT', caps_list)
    207         self.assertGreaterEqual(self.server.nntp_version, 2)
    208         _check_caps(self.server.getcapabilities())
    209         # This re-emits the command
    210         resp, caps = self.server.capabilities()
    211         _check_caps(caps)
    212 
    213     def test_zlogin(self):
    214         # This test must be the penultimate because further commands will be
    215         # refused.
    216         baduser = "notarealuser"
    217         badpw = "notarealpassword"
    218         # Check that bogus credentials cause failure
    219         self.assertRaises(nntplib.NNTPError, self.server.login,
    220                           user=baduser, password=badpw, usenetrc=False)
    221         # FIXME: We should check that correct credentials succeed, but that
    222         # would require valid details for some server somewhere to be in the
    223         # test suite, I think. Gmane is anonymous, at least as used for the
    224         # other tests.
    225 
    226     def test_zzquit(self):
    227         # This test must be called last, hence the name
    228         cls = type(self)
    229         try:
    230             self.server.quit()
    231         finally:
    232             cls.server = None
    233 
    234     @classmethod
    235     def wrap_methods(cls):
    236         # Wrap all methods in a transient_internet() exception catcher
    237         # XXX put a generic version in test.support?
    238         def wrap_meth(meth):
    239             @functools.wraps(meth)
    240             def wrapped(self):
    241                 with support.transient_internet(self.NNTP_HOST):
    242                     meth(self)
    243             return wrapped
    244         for name in dir(cls):
    245             if not name.startswith('test_'):
    246                 continue
    247             meth = getattr(cls, name)
    248             if not callable(meth):
    249                 continue
    250             # Need to use a closure so that meth remains bound to its current
    251             # value
    252             setattr(cls, name, wrap_meth(meth))
    253 
    254     def test_with_statement(self):
    255         def is_connected():
    256             if not hasattr(server, 'file'):
    257                 return False
    258             try:
    259                 server.help()
    260             except (OSError, EOFError):
    261                 return False
    262             return True
    263 
    264         with self.NNTP_CLASS(self.NNTP_HOST, timeout=TIMEOUT, usenetrc=False) as server:
    265             self.assertTrue(is_connected())
    266             self.assertTrue(server.help())
    267         self.assertFalse(is_connected())
    268 
    269         with self.NNTP_CLASS(self.NNTP_HOST, timeout=TIMEOUT, usenetrc=False) as server:
    270             server.quit()
    271         self.assertFalse(is_connected())
    272 
    273 
    274 NetworkedNNTPTestsMixin.wrap_methods()
    275 
    276 
    277 class NetworkedNNTPTests(NetworkedNNTPTestsMixin, unittest.TestCase):
    278     # This server supports STARTTLS (gmane doesn't)
    279     NNTP_HOST = 'news.trigofacile.com'
    280     GROUP_NAME = 'fr.comp.lang.python'
    281     GROUP_PAT = 'fr.comp.lang.*'
    282 
    283     NNTP_CLASS = NNTP
    284 
    285     @classmethod
    286     def setUpClass(cls):
    287         support.requires("network")
    288         with support.transient_internet(cls.NNTP_HOST):
    289             cls.server = cls.NNTP_CLASS(cls.NNTP_HOST, timeout=TIMEOUT, usenetrc=False)
    290 
    291     @classmethod
    292     def tearDownClass(cls):
    293         if cls.server is not None:
    294             cls.server.quit()
    295 
    296 @unittest.skipUnless(ssl, 'requires SSL support')
    297 class NetworkedNNTP_SSLTests(NetworkedNNTPTests):
    298 
    299     # Technical limits for this public NNTP server (see http://www.aioe.org):
    300     # "Only two concurrent connections per IP address are allowed and
    301     # 400 connections per day are accepted from each IP address."
    302 
    303     NNTP_HOST = 'nntp.aioe.org'
    304     GROUP_NAME = 'comp.lang.python'
    305     GROUP_PAT = 'comp.lang.*'
    306 
    307     NNTP_CLASS = getattr(nntplib, 'NNTP_SSL', None)
    308 
    309     # Disabled as it produces too much data
    310     test_list = None
    311 
    312     # Disabled as the connection will already be encrypted.
    313     test_starttls = None
    314 
    315 
    316 #
    317 # Non-networked tests using a local server (or something mocking it).
    318 #
    319 
    320 class _NNTPServerIO(io.RawIOBase):
    321     """A raw IO object allowing NNTP commands to be received and processed
    322     by a handler.  The handler can push responses which can then be read
    323     from the IO object."""
    324 
    325     def __init__(self, handler):
    326         io.RawIOBase.__init__(self)
    327         # The channel from the client
    328         self.c2s = io.BytesIO()
    329         # The channel to the client
    330         self.s2c = io.BytesIO()
    331         self.handler = handler
    332         self.handler.start(self.c2s.readline, self.push_data)
    333 
    334     def readable(self):
    335         return True
    336 
    337     def writable(self):
    338         return True
    339 
    340     def push_data(self, data):
    341         """Push (buffer) some data to send to the client."""
    342         pos = self.s2c.tell()
    343         self.s2c.seek(0, 2)
    344         self.s2c.write(data)
    345         self.s2c.seek(pos)
    346 
    347     def write(self, b):
    348         """The client sends us some data"""
    349         pos = self.c2s.tell()
    350         self.c2s.write(b)
    351         self.c2s.seek(pos)
    352         self.handler.process_pending()
    353         return len(b)
    354 
    355     def readinto(self, buf):
    356         """The client wants to read a response"""
    357         self.handler.process_pending()
    358         b = self.s2c.read(len(buf))
    359         n = len(b)
    360         buf[:n] = b
    361         return n
    362 
    363 
    364 def make_mock_file(handler):
    365     sio = _NNTPServerIO(handler)
    366     # Using BufferedRWPair instead of BufferedRandom ensures the file
    367     # isn't seekable.
    368     file = io.BufferedRWPair(sio, sio)
    369     return (sio, file)
    370 
    371 
    372 class MockedNNTPTestsMixin:
    373     # Override in derived classes
    374     handler_class = None
    375 
    376     def setUp(self):
    377         super().setUp()
    378         self.make_server()
    379 
    380     def tearDown(self):
    381         super().tearDown()
    382         del self.server
    383 
    384     def make_server(self, *args, **kwargs):
    385         self.handler = self.handler_class()
    386         self.sio, file = make_mock_file(self.handler)
    387         self.server = nntplib._NNTPBase(file, 'test.server', *args, **kwargs)
    388         return self.server
    389 
    390 
    391 class MockedNNTPWithReaderModeMixin(MockedNNTPTestsMixin):
    392     def setUp(self):
    393         super().setUp()
    394         self.make_server(readermode=True)
    395 
    396 
    397 class NNTPv1Handler:
    398     """A handler for RFC 977"""
    399 
    400     welcome = "200 NNTP mock server"
    401 
    402     def start(self, readline, push_data):
    403         self.in_body = False
    404         self.allow_posting = True
    405         self._readline = readline
    406         self._push_data = push_data
    407         self._logged_in = False
    408         self._user_sent = False
    409         # Our welcome
    410         self.handle_welcome()
    411 
    412     def _decode(self, data):
    413         return str(data, "utf-8", "surrogateescape")
    414 
    415     def process_pending(self):
    416         if self.in_body:
    417             while True:
    418                 line = self._readline()
    419                 if not line:
    420                     return
    421                 self.body.append(line)
    422                 if line == b".\r\n":
    423                     break
    424             try:
    425                 meth, tokens = self.body_callback
    426                 meth(*tokens, body=self.body)
    427             finally:
    428                 self.body_callback = None
    429                 self.body = None
    430                 self.in_body = False
    431         while True:
    432             line = self._decode(self._readline())
    433             if not line:
    434                 return
    435             if not line.endswith("\r\n"):
    436                 raise ValueError("line doesn't end with \\r\\n: {!r}".format(line))
    437             line = line[:-2]
    438             cmd, *tokens = line.split()
    439             #meth = getattr(self.handler, "handle_" + cmd.upper(), None)
    440             meth = getattr(self, "handle_" + cmd.upper(), None)
    441             if meth is None:
    442                 self.handle_unknown()
    443             else:
    444                 try:
    445                     meth(*tokens)
    446                 except Exception as e:
    447                     raise ValueError("command failed: {!r}".format(line)) from e
    448                 else:
    449                     if self.in_body:
    450                         self.body_callback = meth, tokens
    451                         self.body = []
    452 
    453     def expect_body(self):
    454         """Flag that the client is expected to post a request body"""
    455         self.in_body = True
    456 
    457     def push_data(self, data):
    458         """Push some binary data"""
    459         self._push_data(data)
    460 
    461     def push_lit(self, lit):
    462         """Push a string literal"""
    463         lit = textwrap.dedent(lit)
    464         lit = "\r\n".join(lit.splitlines()) + "\r\n"
    465         lit = lit.encode('utf-8')
    466         self.push_data(lit)
    467 
    468     def handle_unknown(self):
    469         self.push_lit("500 What?")
    470 
    471     def handle_welcome(self):
    472         self.push_lit(self.welcome)
    473 
    474     def handle_QUIT(self):
    475         self.push_lit("205 Bye!")
    476 
    477     def handle_DATE(self):
    478         self.push_lit("111 20100914001155")
    479 
    480     def handle_GROUP(self, group):
    481         if group == "fr.comp.lang.python":
    482             self.push_lit("211 486 761 1265 fr.comp.lang.python")
    483         else:
    484             self.push_lit("411 No such group {}".format(group))
    485 
    486     def handle_HELP(self):
    487         self.push_lit("""\
    488             100 Legal commands
    489               authinfo user Name|pass Password|generic <prog> <args>
    490               date
    491               help
    492             Report problems to <root (at] example.org>
    493             .""")
    494 
    495     def handle_STAT(self, message_spec=None):
    496         if message_spec is None:
    497             self.push_lit("412 No newsgroup selected")
    498         elif message_spec == "3000234":
    499             self.push_lit("223 3000234 <45223423 (at] example.com>")
    500         elif message_spec == "<45223423 (at] example.com>":
    501             self.push_lit("223 0 <45223423 (at] example.com>")
    502         else:
    503             self.push_lit("430 No Such Article Found")
    504 
    505     def handle_NEXT(self):
    506         self.push_lit("223 3000237 <668929 (at] example.org> retrieved")
    507 
    508     def handle_LAST(self):
    509         self.push_lit("223 3000234 <45223423 (at] example.com> retrieved")
    510 
    511     def handle_LIST(self, action=None, param=None):
    512         if action is None:
    513             self.push_lit("""\
    514                 215 Newsgroups in form "group high low flags".
    515                 comp.lang.python 0000052340 0000002828 y
    516                 comp.lang.python.announce 0000001153 0000000993 m
    517                 free.it.comp.lang.python 0000000002 0000000002 y
    518                 fr.comp.lang.python 0000001254 0000000760 y
    519                 free.it.comp.lang.python.learner 0000000000 0000000001 y
    520                 tw.bbs.comp.lang.python 0000000304 0000000304 y
    521                 .""")
    522         elif action == "ACTIVE":
    523             if param == "*distutils*":
    524                 self.push_lit("""\
    525                     215 Newsgroups in form "group high low flags"
    526                     gmane.comp.python.distutils.devel 0000014104 0000000001 m
    527                     gmane.comp.python.distutils.cvs 0000000000 0000000001 m
    528                     .""")
    529             else:
    530                 self.push_lit("""\
    531                     215 Newsgroups in form "group high low flags"
    532                     .""")
    533         elif action == "OVERVIEW.FMT":
    534             self.push_lit("""\
    535                 215 Order of fields in overview database.
    536                 Subject:
    537                 From:
    538                 Date:
    539                 Message-ID:
    540                 References:
    541                 Bytes:
    542                 Lines:
    543                 Xref:full
    544                 .""")
    545         elif action == "NEWSGROUPS":
    546             assert param is not None
    547             if param == "comp.lang.python":
    548                 self.push_lit("""\
    549                     215 Descriptions in form "group description".
    550                     comp.lang.python\tThe Python computer language.
    551                     .""")
    552             elif param == "comp.lang.python*":
    553                 self.push_lit("""\
    554                     215 Descriptions in form "group description".
    555                     comp.lang.python.announce\tAnnouncements about the Python language. (Moderated)
    556                     comp.lang.python\tThe Python computer language.
    557                     .""")
    558             else:
    559                 self.push_lit("""\
    560                     215 Descriptions in form "group description".
    561                     .""")
    562         else:
    563             self.push_lit('501 Unknown LIST keyword')
    564 
    565     def handle_NEWNEWS(self, group, date_str, time_str):
    566         # We hard code different return messages depending on passed
    567         # argument and date syntax.
    568         if (group == "comp.lang.python" and date_str == "20100913"
    569             and time_str == "082004"):
    570             # Date was passed in RFC 3977 format (NNTP "v2")
    571             self.push_lit("""\
    572                 230 list of newsarticles (NNTP v2) created after Mon Sep 13 08:20:04 2010 follows
    573                 <a4929a40-6328-491a-aaaf-cb79ed7309a2 (at] q2g2000vbk.googlegroups.com>
    574                 <f30c0419-f549-4218-848f-d7d0131da931 (at] y3g2000vbm.googlegroups.com>
    575                 .""")
    576         elif (group == "comp.lang.python" and date_str == "100913"
    577             and time_str == "082004"):
    578             # Date was passed in RFC 977 format (NNTP "v1")
    579             self.push_lit("""\
    580                 230 list of newsarticles (NNTP v1) created after Mon Sep 13 08:20:04 2010 follows
    581                 <a4929a40-6328-491a-aaaf-cb79ed7309a2 (at] q2g2000vbk.googlegroups.com>
    582                 <f30c0419-f549-4218-848f-d7d0131da931 (at] y3g2000vbm.googlegroups.com>
    583                 .""")
    584         elif (group == 'comp.lang.python' and
    585               date_str in ('20100101', '100101') and
    586               time_str == '090000'):
    587             self.push_lit('too long line' * 3000 +
    588                           '\n.')
    589         else:
    590             self.push_lit("""\
    591                 230 An empty list of newsarticles follows
    592                 .""")
    593         # (Note for experiments: many servers disable NEWNEWS.
    594         #  As of this writing, sicinfo3.epfl.ch doesn't.)
    595 
    596     def handle_XOVER(self, message_spec):
    597         if message_spec == "57-59":
    598             self.push_lit(
    599                 "224 Overview information for 57-58 follows\n"
    600                 "57\tRe: ANN: New Plone book with strong Python (and Zope) themes throughout"
    601                     "\tDoug Hellmann <doug.hellmann-Re5JQEeQqe8AvxtiuMwx3w (at] public.gmane.org>"
    602                     "\tSat, 19 Jun 2010 18:04:08 -0400"
    603                     "\t<4FD05F05-F98B-44DC-8111-C6009C925F0C (at] gmail.com>"
    604                     "\t<hvalf7$ort$1 (at] dough.gmane.org>\t7103\t16"
    605                     "\tXref: news.gmane.org gmane.comp.python.authors:57"
    606                     "\n"
    607                 "58\tLooking for a few good bloggers"
    608                     "\tDoug Hellmann <doug.hellmann-Re5JQEeQqe8AvxtiuMwx3w (at] public.gmane.org>"
    609                     "\tThu, 22 Jul 2010 09:14:14 -0400"
    610                     "\t<A29863FA-F388-40C3-AA25-0FD06B09B5BF (at] gmail.com>"
    611                     "\t\t6683\t16"
    612                     "\t"
    613                     "\n"
    614                 # A UTF-8 overview line from fr.comp.lang.python
    615                 "59\tRe: Message d'erreur incomprhensible (par moi)"
    616                     "\tEric Brunel <eric.brunel (at] pragmadev.nospam.com>"
    617                     "\tWed, 15 Sep 2010 18:09:15 +0200"
    618                     "\t<eric.brunel-2B8B56.18091515092010 (at] news.wanadoo.fr>"
    619                     "\t<4c90ec87$0$32425$ba4acef3@reader.news.orange.fr>\t1641\t27"
    620                     "\tXref: saria.nerim.net fr.comp.lang.python:1265"
    621                     "\n"
    622                 ".\n")
    623         else:
    624             self.push_lit("""\
    625                 224 No articles
    626                 .""")
    627 
    628     def handle_POST(self, *, body=None):
    629         if body is None:
    630             if self.allow_posting:
    631                 self.push_lit("340 Input article; end with <CR-LF>.<CR-LF>")
    632                 self.expect_body()
    633             else:
    634                 self.push_lit("440 Posting not permitted")
    635         else:
    636             assert self.allow_posting
    637             self.push_lit("240 Article received OK")
    638             self.posted_body = body
    639 
    640     def handle_IHAVE(self, message_id, *, body=None):
    641         if body is None:
    642             if (self.allow_posting and
    643                 message_id == "<i.am.an.article.you.will.want (at] example.com>"):
    644                 self.push_lit("335 Send it; end with <CR-LF>.<CR-LF>")
    645                 self.expect_body()
    646             else:
    647                 self.push_lit("435 Article not wanted")
    648         else:
    649             assert self.allow_posting
    650             self.push_lit("235 Article transferred OK")
    651             self.posted_body = body
    652 
    653     sample_head = """\
    654         From: "Demo User" <nobody (at] example.net>
    655         Subject: I am just a test article
    656         Content-Type: text/plain; charset=UTF-8; format=flowed
    657         Message-ID: <i.am.an.article.you.will.want (at] example.com>"""
    658 
    659     sample_body = """\
    660         This is just a test article.
    661         ..Here is a dot-starting line.
    662 
    663         -- Signed by Andr\xe9."""
    664 
    665     sample_article = sample_head + "\n\n" + sample_body
    666 
    667     def handle_ARTICLE(self, message_spec=None):
    668         if message_spec is None:
    669             self.push_lit("220 3000237 <45223423 (at] example.com>")
    670         elif message_spec == "<45223423 (at] example.com>":
    671             self.push_lit("220 0 <45223423 (at] example.com>")
    672         elif message_spec == "3000234":
    673             self.push_lit("220 3000234 <45223423 (at] example.com>")
    674         else:
    675             self.push_lit("430 No Such Article Found")
    676             return
    677         self.push_lit(self.sample_article)
    678         self.push_lit(".")
    679 
    680     def handle_HEAD(self, message_spec=None):
    681         if message_spec is None:
    682             self.push_lit("221 3000237 <45223423 (at] example.com>")
    683         elif message_spec == "<45223423 (at] example.com>":
    684             self.push_lit("221 0 <45223423 (at] example.com>")
    685         elif message_spec == "3000234":
    686             self.push_lit("221 3000234 <45223423 (at] example.com>")
    687         else:
    688             self.push_lit("430 No Such Article Found")
    689             return
    690         self.push_lit(self.sample_head)
    691         self.push_lit(".")
    692 
    693     def handle_BODY(self, message_spec=None):
    694         if message_spec is None:
    695             self.push_lit("222 3000237 <45223423 (at] example.com>")
    696         elif message_spec == "<45223423 (at] example.com>":
    697             self.push_lit("222 0 <45223423 (at] example.com>")
    698         elif message_spec == "3000234":
    699             self.push_lit("222 3000234 <45223423 (at] example.com>")
    700         else:
    701             self.push_lit("430 No Such Article Found")
    702             return
    703         self.push_lit(self.sample_body)
    704         self.push_lit(".")
    705 
    706     def handle_AUTHINFO(self, cred_type, data):
    707         if self._logged_in:
    708             self.push_lit('502 Already Logged In')
    709         elif cred_type == 'user':
    710             if self._user_sent:
    711                 self.push_lit('482 User Credential Already Sent')
    712             else:
    713                 self.push_lit('381 Password Required')
    714                 self._user_sent = True
    715         elif cred_type == 'pass':
    716             self.push_lit('281 Login Successful')
    717             self._logged_in = True
    718         else:
    719             raise Exception('Unknown cred type {}'.format(cred_type))
    720 
    721 
    722 class NNTPv2Handler(NNTPv1Handler):
    723     """A handler for RFC 3977 (NNTP "v2")"""
    724 
    725     def handle_CAPABILITIES(self):
    726         fmt = """\
    727             101 Capability list:
    728             VERSION 2 3
    729             IMPLEMENTATION INN 2.5.1{}
    730             HDR
    731             LIST ACTIVE ACTIVE.TIMES DISTRIB.PATS HEADERS NEWSGROUPS OVERVIEW.FMT
    732             OVER
    733             POST
    734             READER
    735             ."""
    736 
    737         if not self._logged_in:
    738             self.push_lit(fmt.format('\n            AUTHINFO USER'))
    739         else:
    740             self.push_lit(fmt.format(''))
    741 
    742     def handle_MODE(self, _):
    743         raise Exception('MODE READER sent despite READER has been advertised')
    744 
    745     def handle_OVER(self, message_spec=None):
    746         return self.handle_XOVER(message_spec)
    747 
    748 
    749 class CapsAfterLoginNNTPv2Handler(NNTPv2Handler):
    750     """A handler that allows CAPABILITIES only after login"""
    751 
    752     def handle_CAPABILITIES(self):
    753         if not self._logged_in:
    754             self.push_lit('480 You must log in.')
    755         else:
    756             super().handle_CAPABILITIES()
    757 
    758 
    759 class ModeSwitchingNNTPv2Handler(NNTPv2Handler):
    760     """A server that starts in transit mode"""
    761 
    762     def __init__(self):
    763         self._switched = False
    764 
    765     def handle_CAPABILITIES(self):
    766         fmt = """\
    767             101 Capability list:
    768             VERSION 2 3
    769             IMPLEMENTATION INN 2.5.1
    770             HDR
    771             LIST ACTIVE ACTIVE.TIMES DISTRIB.PATS HEADERS NEWSGROUPS OVERVIEW.FMT
    772             OVER
    773             POST
    774             {}READER
    775             ."""
    776         if self._switched:
    777             self.push_lit(fmt.format(''))
    778         else:
    779             self.push_lit(fmt.format('MODE-'))
    780 
    781     def handle_MODE(self, what):
    782         assert not self._switched and what == 'reader'
    783         self._switched = True
    784         self.push_lit('200 Posting allowed')
    785 
    786 
    787 class NNTPv1v2TestsMixin:
    788 
    789     def setUp(self):
    790         super().setUp()
    791 
    792     def test_welcome(self):
    793         self.assertEqual(self.server.welcome, self.handler.welcome)
    794 
    795     def test_authinfo(self):
    796         if self.nntp_version == 2:
    797             self.assertIn('AUTHINFO', self.server._caps)
    798         self.server.login('testuser', 'testpw')
    799         # if AUTHINFO is gone from _caps we also know that getcapabilities()
    800         # has been called after login as it should
    801         self.assertNotIn('AUTHINFO', self.server._caps)
    802 
    803     def test_date(self):
    804         resp, date = self.server.date()
    805         self.assertEqual(resp, "111 20100914001155")
    806         self.assertEqual(date, datetime.datetime(2010, 9, 14, 0, 11, 55))
    807 
    808     def test_quit(self):
    809         self.assertFalse(self.sio.closed)
    810         resp = self.server.quit()
    811         self.assertEqual(resp, "205 Bye!")
    812         self.assertTrue(self.sio.closed)
    813 
    814     def test_help(self):
    815         resp, help = self.server.help()
    816         self.assertEqual(resp, "100 Legal commands")
    817         self.assertEqual(help, [
    818             '  authinfo user Name|pass Password|generic <prog> <args>',
    819             '  date',
    820             '  help',
    821             'Report problems to <root (at] example.org>',
    822         ])
    823 
    824     def test_list(self):
    825         resp, groups = self.server.list()
    826         self.assertEqual(len(groups), 6)
    827         g = groups[1]
    828         self.assertEqual(g,
    829             GroupInfo("comp.lang.python.announce", "0000001153",
    830                       "0000000993", "m"))
    831         resp, groups = self.server.list("*distutils*")
    832         self.assertEqual(len(groups), 2)
    833         g = groups[0]
    834         self.assertEqual(g,
    835             GroupInfo("gmane.comp.python.distutils.devel", "0000014104",
    836                       "0000000001", "m"))
    837 
    838     def test_stat(self):
    839         resp, art_num, message_id = self.server.stat(3000234)
    840         self.assertEqual(resp, "223 3000234 <45223423 (at] example.com>")
    841         self.assertEqual(art_num, 3000234)
    842         self.assertEqual(message_id, "<45223423 (at] example.com>")
    843         resp, art_num, message_id = self.server.stat("<45223423 (at] example.com>")
    844         self.assertEqual(resp, "223 0 <45223423 (at] example.com>")
    845         self.assertEqual(art_num, 0)
    846         self.assertEqual(message_id, "<45223423 (at] example.com>")
    847         with self.assertRaises(nntplib.NNTPTemporaryError) as cm:
    848             self.server.stat("<non.existent.id>")
    849         self.assertEqual(cm.exception.response, "430 No Such Article Found")
    850         with self.assertRaises(nntplib.NNTPTemporaryError) as cm:
    851             self.server.stat()
    852         self.assertEqual(cm.exception.response, "412 No newsgroup selected")
    853 
    854     def test_next(self):
    855         resp, art_num, message_id = self.server.next()
    856         self.assertEqual(resp, "223 3000237 <668929 (at] example.org> retrieved")
    857         self.assertEqual(art_num, 3000237)
    858         self.assertEqual(message_id, "<668929 (at] example.org>")
    859 
    860     def test_last(self):
    861         resp, art_num, message_id = self.server.last()
    862         self.assertEqual(resp, "223 3000234 <45223423 (at] example.com> retrieved")
    863         self.assertEqual(art_num, 3000234)
    864         self.assertEqual(message_id, "<45223423 (at] example.com>")
    865 
    866     def test_description(self):
    867         desc = self.server.description("comp.lang.python")
    868         self.assertEqual(desc, "The Python computer language.")
    869         desc = self.server.description("comp.lang.pythonx")
    870         self.assertEqual(desc, "")
    871 
    872     def test_descriptions(self):
    873         resp, groups = self.server.descriptions("comp.lang.python")
    874         self.assertEqual(resp, '215 Descriptions in form "group description".')
    875         self.assertEqual(groups, {
    876             "comp.lang.python": "The Python computer language.",
    877             })
    878         resp, groups = self.server.descriptions("comp.lang.python*")
    879         self.assertEqual(groups, {
    880             "comp.lang.python": "The Python computer language.",
    881             "comp.lang.python.announce": "Announcements about the Python language. (Moderated)",
    882             })
    883         resp, groups = self.server.descriptions("comp.lang.pythonx")
    884         self.assertEqual(groups, {})
    885 
    886     def test_group(self):
    887         resp, count, first, last, group = self.server.group("fr.comp.lang.python")
    888         self.assertTrue(resp.startswith("211 "), resp)
    889         self.assertEqual(first, 761)
    890         self.assertEqual(last, 1265)
    891         self.assertEqual(count, 486)
    892         self.assertEqual(group, "fr.comp.lang.python")
    893         with self.assertRaises(nntplib.NNTPTemporaryError) as cm:
    894             self.server.group("comp.lang.python.devel")
    895         exc = cm.exception
    896         self.assertTrue(exc.response.startswith("411 No such group"),
    897                         exc.response)
    898 
    899     def test_newnews(self):
    900         # NEWNEWS comp.lang.python [20]100913 082004
    901         dt = datetime.datetime(2010, 9, 13, 8, 20, 4)
    902         resp, ids = self.server.newnews("comp.lang.python", dt)
    903         expected = (
    904             "230 list of newsarticles (NNTP v{0}) "
    905             "created after Mon Sep 13 08:20:04 2010 follows"
    906             ).format(self.nntp_version)
    907         self.assertEqual(resp, expected)
    908         self.assertEqual(ids, [
    909             "<a4929a40-6328-491a-aaaf-cb79ed7309a2 (at] q2g2000vbk.googlegroups.com>",
    910             "<f30c0419-f549-4218-848f-d7d0131da931 (at] y3g2000vbm.googlegroups.com>",
    911             ])
    912         # NEWNEWS fr.comp.lang.python [20]100913 082004
    913         dt = datetime.datetime(2010, 9, 13, 8, 20, 4)
    914         resp, ids = self.server.newnews("fr.comp.lang.python", dt)
    915         self.assertEqual(resp, "230 An empty list of newsarticles follows")
    916         self.assertEqual(ids, [])
    917 
    918     def _check_article_body(self, lines):
    919         self.assertEqual(len(lines), 4)
    920         self.assertEqual(lines[-1].decode('utf-8'), "-- Signed by Andr.")
    921         self.assertEqual(lines[-2], b"")
    922         self.assertEqual(lines[-3], b".Here is a dot-starting line.")
    923         self.assertEqual(lines[-4], b"This is just a test article.")
    924 
    925     def _check_article_head(self, lines):
    926         self.assertEqual(len(lines), 4)
    927         self.assertEqual(lines[0], b'From: "Demo User" <nobody (at] example.net>')
    928         self.assertEqual(lines[3], b"Message-ID: <i.am.an.article.you.will.want (at] example.com>")
    929 
    930     def _check_article_data(self, lines):
    931         self.assertEqual(len(lines), 9)
    932         self._check_article_head(lines[:4])
    933         self._check_article_body(lines[-4:])
    934         self.assertEqual(lines[4], b"")
    935 
    936     def test_article(self):
    937         # ARTICLE
    938         resp, info = self.server.article()
    939         self.assertEqual(resp, "220 3000237 <45223423 (at] example.com>")
    940         art_num, message_id, lines = info
    941         self.assertEqual(art_num, 3000237)
    942         self.assertEqual(message_id, "<45223423 (at] example.com>")
    943         self._check_article_data(lines)
    944         # ARTICLE num
    945         resp, info = self.server.article(3000234)
    946         self.assertEqual(resp, "220 3000234 <45223423 (at] example.com>")
    947         art_num, message_id, lines = info
    948         self.assertEqual(art_num, 3000234)
    949         self.assertEqual(message_id, "<45223423 (at] example.com>")
    950         self._check_article_data(lines)
    951         # ARTICLE id
    952         resp, info = self.server.article("<45223423 (at] example.com>")
    953         self.assertEqual(resp, "220 0 <45223423 (at] example.com>")
    954         art_num, message_id, lines = info
    955         self.assertEqual(art_num, 0)
    956         self.assertEqual(message_id, "<45223423 (at] example.com>")
    957         self._check_article_data(lines)
    958         # Non-existent id
    959         with self.assertRaises(nntplib.NNTPTemporaryError) as cm:
    960             self.server.article("<non-existent (at] example.com>")
    961         self.assertEqual(cm.exception.response, "430 No Such Article Found")
    962 
    963     def test_article_file(self):
    964         # With a "file" argument
    965         f = io.BytesIO()
    966         resp, info = self.server.article(file=f)
    967         self.assertEqual(resp, "220 3000237 <45223423 (at] example.com>")
    968         art_num, message_id, lines = info
    969         self.assertEqual(art_num, 3000237)
    970         self.assertEqual(message_id, "<45223423 (at] example.com>")
    971         self.assertEqual(lines, [])
    972         data = f.getvalue()
    973         self.assertTrue(data.startswith(
    974             b'From: "Demo User" <nobody (at] example.net>\r\n'
    975             b'Subject: I am just a test article\r\n'
    976             ), ascii(data))
    977         self.assertTrue(data.endswith(
    978             b'This is just a test article.\r\n'
    979             b'.Here is a dot-starting line.\r\n'
    980             b'\r\n'
    981             b'-- Signed by Andr\xc3\xa9.\r\n'
    982             ), ascii(data))
    983 
    984     def test_head(self):
    985         # HEAD
    986         resp, info = self.server.head()
    987         self.assertEqual(resp, "221 3000237 <45223423 (at] example.com>")
    988         art_num, message_id, lines = info
    989         self.assertEqual(art_num, 3000237)
    990         self.assertEqual(message_id, "<45223423 (at] example.com>")
    991         self._check_article_head(lines)
    992         # HEAD num
    993         resp, info = self.server.head(3000234)
    994         self.assertEqual(resp, "221 3000234 <45223423 (at] example.com>")
    995         art_num, message_id, lines = info
    996         self.assertEqual(art_num, 3000234)
    997         self.assertEqual(message_id, "<45223423 (at] example.com>")
    998         self._check_article_head(lines)
    999         # HEAD id
   1000         resp, info = self.server.head("<45223423 (at] example.com>")
   1001         self.assertEqual(resp, "221 0 <45223423 (at] example.com>")
   1002         art_num, message_id, lines = info
   1003         self.assertEqual(art_num, 0)
   1004         self.assertEqual(message_id, "<45223423 (at] example.com>")
   1005         self._check_article_head(lines)
   1006         # Non-existent id
   1007         with self.assertRaises(nntplib.NNTPTemporaryError) as cm:
   1008             self.server.head("<non-existent (at] example.com>")
   1009         self.assertEqual(cm.exception.response, "430 No Such Article Found")
   1010 
   1011     def test_head_file(self):
   1012         f = io.BytesIO()
   1013         resp, info = self.server.head(file=f)
   1014         self.assertEqual(resp, "221 3000237 <45223423 (at] example.com>")
   1015         art_num, message_id, lines = info
   1016         self.assertEqual(art_num, 3000237)
   1017         self.assertEqual(message_id, "<45223423 (at] example.com>")
   1018         self.assertEqual(lines, [])
   1019         data = f.getvalue()
   1020         self.assertTrue(data.startswith(
   1021             b'From: "Demo User" <nobody (at] example.net>\r\n'
   1022             b'Subject: I am just a test article\r\n'
   1023             ), ascii(data))
   1024         self.assertFalse(data.endswith(
   1025             b'This is just a test article.\r\n'
   1026             b'.Here is a dot-starting line.\r\n'
   1027             b'\r\n'
   1028             b'-- Signed by Andr\xc3\xa9.\r\n'
   1029             ), ascii(data))
   1030 
   1031     def test_body(self):
   1032         # BODY
   1033         resp, info = self.server.body()
   1034         self.assertEqual(resp, "222 3000237 <45223423 (at] example.com>")
   1035         art_num, message_id, lines = info
   1036         self.assertEqual(art_num, 3000237)
   1037         self.assertEqual(message_id, "<45223423 (at] example.com>")
   1038         self._check_article_body(lines)
   1039         # BODY num
   1040         resp, info = self.server.body(3000234)
   1041         self.assertEqual(resp, "222 3000234 <45223423 (at] example.com>")
   1042         art_num, message_id, lines = info
   1043         self.assertEqual(art_num, 3000234)
   1044         self.assertEqual(message_id, "<45223423 (at] example.com>")
   1045         self._check_article_body(lines)
   1046         # BODY id
   1047         resp, info = self.server.body("<45223423 (at] example.com>")
   1048         self.assertEqual(resp, "222 0 <45223423 (at] example.com>")
   1049         art_num, message_id, lines = info
   1050         self.assertEqual(art_num, 0)
   1051         self.assertEqual(message_id, "<45223423 (at] example.com>")
   1052         self._check_article_body(lines)
   1053         # Non-existent id
   1054         with self.assertRaises(nntplib.NNTPTemporaryError) as cm:
   1055             self.server.body("<non-existent (at] example.com>")
   1056         self.assertEqual(cm.exception.response, "430 No Such Article Found")
   1057 
   1058     def test_body_file(self):
   1059         f = io.BytesIO()
   1060         resp, info = self.server.body(file=f)
   1061         self.assertEqual(resp, "222 3000237 <45223423 (at] example.com>")
   1062         art_num, message_id, lines = info
   1063         self.assertEqual(art_num, 3000237)
   1064         self.assertEqual(message_id, "<45223423 (at] example.com>")
   1065         self.assertEqual(lines, [])
   1066         data = f.getvalue()
   1067         self.assertFalse(data.startswith(
   1068             b'From: "Demo User" <nobody (at] example.net>\r\n'
   1069             b'Subject: I am just a test article\r\n'
   1070             ), ascii(data))
   1071         self.assertTrue(data.endswith(
   1072             b'This is just a test article.\r\n'
   1073             b'.Here is a dot-starting line.\r\n'
   1074             b'\r\n'
   1075             b'-- Signed by Andr\xc3\xa9.\r\n'
   1076             ), ascii(data))
   1077 
   1078     def check_over_xover_resp(self, resp, overviews):
   1079         self.assertTrue(resp.startswith("224 "), resp)
   1080         self.assertEqual(len(overviews), 3)
   1081         art_num, over = overviews[0]
   1082         self.assertEqual(art_num, 57)
   1083         self.assertEqual(over, {
   1084             "from": "Doug Hellmann <doug.hellmann-Re5JQEeQqe8AvxtiuMwx3w (at] public.gmane.org>",
   1085             "subject": "Re: ANN: New Plone book with strong Python (and Zope) themes throughout",
   1086             "date": "Sat, 19 Jun 2010 18:04:08 -0400",
   1087             "message-id": "<4FD05F05-F98B-44DC-8111-C6009C925F0C (at] gmail.com>",
   1088             "references": "<hvalf7$ort$1 (at] dough.gmane.org>",
   1089             ":bytes": "7103",
   1090             ":lines": "16",
   1091             "xref": "news.gmane.org gmane.comp.python.authors:57"
   1092             })
   1093         art_num, over = overviews[1]
   1094         self.assertEqual(over["xref"], None)
   1095         art_num, over = overviews[2]
   1096         self.assertEqual(over["subject"],
   1097                          "Re: Message d'erreur incomprhensible (par moi)")
   1098 
   1099     def test_xover(self):
   1100         resp, overviews = self.server.xover(57, 59)
   1101         self.check_over_xover_resp(resp, overviews)
   1102 
   1103     def test_over(self):
   1104         # In NNTP "v1", this will fallback on XOVER
   1105         resp, overviews = self.server.over((57, 59))
   1106         self.check_over_xover_resp(resp, overviews)
   1107 
   1108     sample_post = (
   1109         b'From: "Demo User" <nobody (at] example.net>\r\n'
   1110         b'Subject: I am just a test article\r\n'
   1111         b'Content-Type: text/plain; charset=UTF-8; format=flowed\r\n'
   1112         b'Message-ID: <i.am.an.article.you.will.want (at] example.com>\r\n'
   1113         b'\r\n'
   1114         b'This is just a test article.\r\n'
   1115         b'.Here is a dot-starting line.\r\n'
   1116         b'\r\n'
   1117         b'-- Signed by Andr\xc3\xa9.\r\n'
   1118     )
   1119 
   1120     def _check_posted_body(self):
   1121         # Check the raw body as received by the server
   1122         lines = self.handler.posted_body
   1123         # One additional line for the "." terminator
   1124         self.assertEqual(len(lines), 10)
   1125         self.assertEqual(lines[-1], b'.\r\n')
   1126         self.assertEqual(lines[-2], b'-- Signed by Andr\xc3\xa9.\r\n')
   1127         self.assertEqual(lines[-3], b'\r\n')
   1128         self.assertEqual(lines[-4], b'..Here is a dot-starting line.\r\n')
   1129         self.assertEqual(lines[0], b'From: "Demo User" <nobody (at] example.net>\r\n')
   1130 
   1131     def _check_post_ihave_sub(self, func, *args, file_factory):
   1132         # First the prepared post with CRLF endings
   1133         post = self.sample_post
   1134         func_args = args + (file_factory(post),)
   1135         self.handler.posted_body = None
   1136         resp = func(*func_args)
   1137         self._check_posted_body()
   1138         # Then the same post with "normal" line endings - they should be
   1139         # converted by NNTP.post and NNTP.ihave.
   1140         post = self.sample_post.replace(b"\r\n", b"\n")
   1141         func_args = args + (file_factory(post),)
   1142         self.handler.posted_body = None
   1143         resp = func(*func_args)
   1144         self._check_posted_body()
   1145         return resp
   1146 
   1147     def check_post_ihave(self, func, success_resp, *args):
   1148         # With a bytes object
   1149         resp = self._check_post_ihave_sub(func, *args, file_factory=bytes)
   1150         self.assertEqual(resp, success_resp)
   1151         # With a bytearray object
   1152         resp = self._check_post_ihave_sub(func, *args, file_factory=bytearray)
   1153         self.assertEqual(resp, success_resp)
   1154         # With a file object
   1155         resp = self._check_post_ihave_sub(func, *args, file_factory=io.BytesIO)
   1156         self.assertEqual(resp, success_resp)
   1157         # With an iterable of terminated lines
   1158         def iterlines(b):
   1159             return iter(b.splitlines(keepends=True))
   1160         resp = self._check_post_ihave_sub(func, *args, file_factory=iterlines)
   1161         self.assertEqual(resp, success_resp)
   1162         # With an iterable of non-terminated lines
   1163         def iterlines(b):
   1164             return iter(b.splitlines(keepends=False))
   1165         resp = self._check_post_ihave_sub(func, *args, file_factory=iterlines)
   1166         self.assertEqual(resp, success_resp)
   1167 
   1168     def test_post(self):
   1169         self.check_post_ihave(self.server.post, "240 Article received OK")
   1170         self.handler.allow_posting = False
   1171         with self.assertRaises(nntplib.NNTPTemporaryError) as cm:
   1172             self.server.post(self.sample_post)
   1173         self.assertEqual(cm.exception.response,
   1174                          "440 Posting not permitted")
   1175 
   1176     def test_ihave(self):
   1177         self.check_post_ihave(self.server.ihave, "235 Article transferred OK",
   1178                               "<i.am.an.article.you.will.want (at] example.com>")
   1179         with self.assertRaises(nntplib.NNTPTemporaryError) as cm:
   1180             self.server.ihave("<another.message.id>", self.sample_post)
   1181         self.assertEqual(cm.exception.response,
   1182                          "435 Article not wanted")
   1183 
   1184     def test_too_long_lines(self):
   1185         dt = datetime.datetime(2010, 1, 1, 9, 0, 0)
   1186         self.assertRaises(nntplib.NNTPDataError,
   1187                           self.server.newnews, "comp.lang.python", dt)
   1188 
   1189 
   1190 class NNTPv1Tests(NNTPv1v2TestsMixin, MockedNNTPTestsMixin, unittest.TestCase):
   1191     """Tests an NNTP v1 server (no capabilities)."""
   1192 
   1193     nntp_version = 1
   1194     handler_class = NNTPv1Handler
   1195 
   1196     def test_caps(self):
   1197         caps = self.server.getcapabilities()
   1198         self.assertEqual(caps, {})
   1199         self.assertEqual(self.server.nntp_version, 1)
   1200         self.assertEqual(self.server.nntp_implementation, None)
   1201 
   1202 
   1203 class NNTPv2Tests(NNTPv1v2TestsMixin, MockedNNTPTestsMixin, unittest.TestCase):
   1204     """Tests an NNTP v2 server (with capabilities)."""
   1205 
   1206     nntp_version = 2
   1207     handler_class = NNTPv2Handler
   1208 
   1209     def test_caps(self):
   1210         caps = self.server.getcapabilities()
   1211         self.assertEqual(caps, {
   1212             'VERSION': ['2', '3'],
   1213             'IMPLEMENTATION': ['INN', '2.5.1'],
   1214             'AUTHINFO': ['USER'],
   1215             'HDR': [],
   1216             'LIST': ['ACTIVE', 'ACTIVE.TIMES', 'DISTRIB.PATS',
   1217                      'HEADERS', 'NEWSGROUPS', 'OVERVIEW.FMT'],
   1218             'OVER': [],
   1219             'POST': [],
   1220             'READER': [],
   1221             })
   1222         self.assertEqual(self.server.nntp_version, 3)
   1223         self.assertEqual(self.server.nntp_implementation, 'INN 2.5.1')
   1224 
   1225 
   1226 class CapsAfterLoginNNTPv2Tests(MockedNNTPTestsMixin, unittest.TestCase):
   1227     """Tests a probably NNTP v2 server with capabilities only after login."""
   1228 
   1229     nntp_version = 2
   1230     handler_class = CapsAfterLoginNNTPv2Handler
   1231 
   1232     def test_caps_only_after_login(self):
   1233         self.assertEqual(self.server._caps, {})
   1234         self.server.login('testuser', 'testpw')
   1235         self.assertIn('VERSION', self.server._caps)
   1236 
   1237 
   1238 class SendReaderNNTPv2Tests(MockedNNTPWithReaderModeMixin,
   1239         unittest.TestCase):
   1240     """Same tests as for v2 but we tell NTTP to send MODE READER to a server
   1241     that isn't in READER mode by default."""
   1242 
   1243     nntp_version = 2
   1244     handler_class = ModeSwitchingNNTPv2Handler
   1245 
   1246     def test_we_are_in_reader_mode_after_connect(self):
   1247         self.assertIn('READER', self.server._caps)
   1248 
   1249 
   1250 class MiscTests(unittest.TestCase):
   1251 
   1252     def test_decode_header(self):
   1253         def gives(a, b):
   1254             self.assertEqual(nntplib.decode_header(a), b)
   1255         gives("" , "")
   1256         gives("a plain header", "a plain header")
   1257         gives(" with extra  spaces ", " with extra  spaces ")
   1258         gives("=?ISO-8859-15?Q?D=E9buter_en_Python?=", "Dbuter en Python")
   1259         gives("=?utf-8?q?Re=3A_=5Bsqlite=5D_probl=C3=A8me_avec_ORDER_BY_sur_des_cha?="
   1260               " =?utf-8?q?=C3=AEnes_de_caract=C3=A8res_accentu=C3=A9es?=",
   1261               "Re: [sqlite] problme avec ORDER BY sur des chanes de caractres accentues")
   1262         gives("Re: =?UTF-8?B?cHJvYmzDqG1lIGRlIG1hdHJpY2U=?=",
   1263               "Re: problme de matrice")
   1264         # A natively utf-8 header (found in the real world!)
   1265         gives("Re: Message d'erreur incomprhensible (par moi)",
   1266               "Re: Message d'erreur incomprhensible (par moi)")
   1267 
   1268     def test_parse_overview_fmt(self):
   1269         # The minimal (default) response
   1270         lines = ["Subject:", "From:", "Date:", "Message-ID:",
   1271                  "References:", ":bytes", ":lines"]
   1272         self.assertEqual(nntplib._parse_overview_fmt(lines),
   1273             ["subject", "from", "date", "message-id", "references",
   1274              ":bytes", ":lines"])
   1275         # The minimal response using alternative names
   1276         lines = ["Subject:", "From:", "Date:", "Message-ID:",
   1277                  "References:", "Bytes:", "Lines:"]
   1278         self.assertEqual(nntplib._parse_overview_fmt(lines),
   1279             ["subject", "from", "date", "message-id", "references",
   1280              ":bytes", ":lines"])
   1281         # Variations in casing
   1282         lines = ["subject:", "FROM:", "DaTe:", "message-ID:",
   1283                  "References:", "BYTES:", "Lines:"]
   1284         self.assertEqual(nntplib._parse_overview_fmt(lines),
   1285             ["subject", "from", "date", "message-id", "references",
   1286              ":bytes", ":lines"])
   1287         # First example from RFC 3977
   1288         lines = ["Subject:", "From:", "Date:", "Message-ID:",
   1289                  "References:", ":bytes", ":lines", "Xref:full",
   1290                  "Distribution:full"]
   1291         self.assertEqual(nntplib._parse_overview_fmt(lines),
   1292             ["subject", "from", "date", "message-id", "references",
   1293              ":bytes", ":lines", "xref", "distribution"])
   1294         # Second example from RFC 3977
   1295         lines = ["Subject:", "From:", "Date:", "Message-ID:",
   1296                  "References:", "Bytes:", "Lines:", "Xref:FULL",
   1297                  "Distribution:FULL"]
   1298         self.assertEqual(nntplib._parse_overview_fmt(lines),
   1299             ["subject", "from", "date", "message-id", "references",
   1300              ":bytes", ":lines", "xref", "distribution"])
   1301         # A classic response from INN
   1302         lines = ["Subject:", "From:", "Date:", "Message-ID:",
   1303                  "References:", "Bytes:", "Lines:", "Xref:full"]
   1304         self.assertEqual(nntplib._parse_overview_fmt(lines),
   1305             ["subject", "from", "date", "message-id", "references",
   1306              ":bytes", ":lines", "xref"])
   1307 
   1308     def test_parse_overview(self):
   1309         fmt = nntplib._DEFAULT_OVERVIEW_FMT + ["xref"]
   1310         # First example from RFC 3977
   1311         lines = [
   1312             '3000234\tI am just a test article\t"Demo User" '
   1313             '<nobody (at] example.com>\t6 Oct 1998 04:38:40 -0500\t'
   1314             '<45223423 (at] example.com>\t<45454 (at] example.net>\t1234\t'
   1315             '17\tXref: news.example.com misc.test:3000363',
   1316         ]
   1317         overview = nntplib._parse_overview(lines, fmt)
   1318         (art_num, fields), = overview
   1319         self.assertEqual(art_num, 3000234)
   1320         self.assertEqual(fields, {
   1321             'subject': 'I am just a test article',
   1322             'from': '"Demo User" <nobody (at] example.com>',
   1323             'date': '6 Oct 1998 04:38:40 -0500',
   1324             'message-id': '<45223423 (at] example.com>',
   1325             'references': '<45454 (at] example.net>',
   1326             ':bytes': '1234',
   1327             ':lines': '17',
   1328             'xref': 'news.example.com misc.test:3000363',
   1329         })
   1330         # Second example; here the "Xref" field is totally absent (including
   1331         # the header name) and comes out as None
   1332         lines = [
   1333             '3000234\tI am just a test article\t"Demo User" '
   1334             '<nobody (at] example.com>\t6 Oct 1998 04:38:40 -0500\t'
   1335             '<45223423 (at] example.com>\t<45454 (at] example.net>\t1234\t'
   1336             '17\t\t',
   1337         ]
   1338         overview = nntplib._parse_overview(lines, fmt)
   1339         (art_num, fields), = overview
   1340         self.assertEqual(fields['xref'], None)
   1341         # Third example; the "Xref" is an empty string, while "references"
   1342         # is a single space.
   1343         lines = [
   1344             '3000234\tI am just a test article\t"Demo User" '
   1345             '<nobody (at] example.com>\t6 Oct 1998 04:38:40 -0500\t'
   1346             '<45223423 (at] example.com>\t \t1234\t'
   1347             '17\tXref: \t',
   1348         ]
   1349         overview = nntplib._parse_overview(lines, fmt)
   1350         (art_num, fields), = overview
   1351         self.assertEqual(fields['references'], ' ')
   1352         self.assertEqual(fields['xref'], '')
   1353 
   1354     def test_parse_datetime(self):
   1355         def gives(a, b, *c):
   1356             self.assertEqual(nntplib._parse_datetime(a, b),
   1357                              datetime.datetime(*c))
   1358         # Output of DATE command
   1359         gives("19990623135624", None, 1999, 6, 23, 13, 56, 24)
   1360         # Variations
   1361         gives("19990623", "135624", 1999, 6, 23, 13, 56, 24)
   1362         gives("990623", "135624", 1999, 6, 23, 13, 56, 24)
   1363         gives("090623", "135624", 2009, 6, 23, 13, 56, 24)
   1364 
   1365     def test_unparse_datetime(self):
   1366         # Test non-legacy mode
   1367         # 1) with a datetime
   1368         def gives(y, M, d, h, m, s, date_str, time_str):
   1369             dt = datetime.datetime(y, M, d, h, m, s)
   1370             self.assertEqual(nntplib._unparse_datetime(dt),
   1371                              (date_str, time_str))
   1372             self.assertEqual(nntplib._unparse_datetime(dt, False),
   1373                              (date_str, time_str))
   1374         gives(1999, 6, 23, 13, 56, 24, "19990623", "135624")
   1375         gives(2000, 6, 23, 13, 56, 24, "20000623", "135624")
   1376         gives(2010, 6, 5, 1, 2, 3, "20100605", "010203")
   1377         # 2) with a date
   1378         def gives(y, M, d, date_str, time_str):
   1379             dt = datetime.date(y, M, d)
   1380             self.assertEqual(nntplib._unparse_datetime(dt),
   1381                              (date_str, time_str))
   1382             self.assertEqual(nntplib._unparse_datetime(dt, False),
   1383                              (date_str, time_str))
   1384         gives(1999, 6, 23, "19990623", "000000")
   1385         gives(2000, 6, 23, "20000623", "000000")
   1386         gives(2010, 6, 5, "20100605", "000000")
   1387 
   1388     def test_unparse_datetime_legacy(self):
   1389         # Test legacy mode (RFC 977)
   1390         # 1) with a datetime
   1391         def gives(y, M, d, h, m, s, date_str, time_str):
   1392             dt = datetime.datetime(y, M, d, h, m, s)
   1393             self.assertEqual(nntplib._unparse_datetime(dt, True),
   1394                              (date_str, time_str))
   1395         gives(1999, 6, 23, 13, 56, 24, "990623", "135624")
   1396         gives(2000, 6, 23, 13, 56, 24, "000623", "135624")
   1397         gives(2010, 6, 5, 1, 2, 3, "100605", "010203")
   1398         # 2) with a date
   1399         def gives(y, M, d, date_str, time_str):
   1400             dt = datetime.date(y, M, d)
   1401             self.assertEqual(nntplib._unparse_datetime(dt, True),
   1402                              (date_str, time_str))
   1403         gives(1999, 6, 23, "990623", "000000")
   1404         gives(2000, 6, 23, "000623", "000000")
   1405         gives(2010, 6, 5, "100605", "000000")
   1406 
   1407     @unittest.skipUnless(ssl, 'requires SSL support')
   1408     def test_ssl_support(self):
   1409         self.assertTrue(hasattr(nntplib, 'NNTP_SSL'))
   1410 
   1411 
   1412 class PublicAPITests(unittest.TestCase):
   1413     """Ensures that the correct values are exposed in the public API."""
   1414 
   1415     def test_module_all_attribute(self):
   1416         self.assertTrue(hasattr(nntplib, '__all__'))
   1417         target_api = ['NNTP', 'NNTPError', 'NNTPReplyError',
   1418                       'NNTPTemporaryError', 'NNTPPermanentError',
   1419                       'NNTPProtocolError', 'NNTPDataError', 'decode_header']
   1420         if ssl is not None:
   1421             target_api.append('NNTP_SSL')
   1422         self.assertEqual(set(nntplib.__all__), set(target_api))
   1423 
   1424 class MockSocketTests(unittest.TestCase):
   1425     """Tests involving a mock socket object
   1426 
   1427     Used where the _NNTPServerIO file object is not enough."""
   1428 
   1429     nntp_class = nntplib.NNTP
   1430 
   1431     def check_constructor_error_conditions(
   1432             self, handler_class,
   1433             expected_error_type, expected_error_msg,
   1434             login=None, password=None):
   1435 
   1436         class mock_socket_module:
   1437             def create_connection(address, timeout):
   1438                 return MockSocket()
   1439 
   1440         class MockSocket:
   1441             def close(self):
   1442                 nonlocal socket_closed
   1443                 socket_closed = True
   1444 
   1445             def makefile(socket, mode):
   1446                 handler = handler_class()
   1447                 _, file = make_mock_file(handler)
   1448                 files.append(file)
   1449                 return file
   1450 
   1451         socket_closed = False
   1452         files = []
   1453         with patch('nntplib.socket', mock_socket_module), \
   1454              self.assertRaisesRegex(expected_error_type, expected_error_msg):
   1455             self.nntp_class('dummy', user=login, password=password)
   1456         self.assertTrue(socket_closed)
   1457         for f in files:
   1458             self.assertTrue(f.closed)
   1459 
   1460     def test_bad_welcome(self):
   1461         #Test a bad welcome message
   1462         class Handler(NNTPv1Handler):
   1463             welcome = 'Bad Welcome'
   1464         self.check_constructor_error_conditions(
   1465             Handler, nntplib.NNTPProtocolError, Handler.welcome)
   1466 
   1467     def test_service_temporarily_unavailable(self):
   1468         #Test service temporarily unavailable
   1469         class Handler(NNTPv1Handler):
   1470             welcome = '400 Service temporarily unavailable'
   1471         self.check_constructor_error_conditions(
   1472             Handler, nntplib.NNTPTemporaryError, Handler.welcome)
   1473 
   1474     def test_service_permanently_unavailable(self):
   1475         #Test service permanently unavailable
   1476         class Handler(NNTPv1Handler):
   1477             welcome = '502 Service permanently unavailable'
   1478         self.check_constructor_error_conditions(
   1479             Handler, nntplib.NNTPPermanentError, Handler.welcome)
   1480 
   1481     def test_bad_capabilities(self):
   1482         #Test a bad capabilities response
   1483         class Handler(NNTPv1Handler):
   1484             def handle_CAPABILITIES(self):
   1485                 self.push_lit(capabilities_response)
   1486         capabilities_response = '201 bad capability'
   1487         self.check_constructor_error_conditions(
   1488             Handler, nntplib.NNTPReplyError, capabilities_response)
   1489 
   1490     def test_login_aborted(self):
   1491         #Test a bad authinfo response
   1492         login = 't (at] e.com'
   1493         password = 'python'
   1494         class Handler(NNTPv1Handler):
   1495             def handle_AUTHINFO(self, *args):
   1496                 self.push_lit(authinfo_response)
   1497         authinfo_response = '503 Mechanism not recognized'
   1498         self.check_constructor_error_conditions(
   1499             Handler, nntplib.NNTPPermanentError, authinfo_response,
   1500             login, password)
   1501 
   1502 class bypass_context:
   1503     """Bypass encryption and actual SSL module"""
   1504     def wrap_socket(sock, **args):
   1505         return sock
   1506 
   1507 @unittest.skipUnless(ssl, 'requires SSL support')
   1508 class MockSslTests(MockSocketTests):
   1509     @staticmethod
   1510     def nntp_class(*pos, **kw):
   1511         return nntplib.NNTP_SSL(*pos, ssl_context=bypass_context, **kw)
   1512 
   1513 @unittest.skipUnless(threading, 'requires multithreading')
   1514 class LocalServerTests(unittest.TestCase):
   1515     def setUp(self):
   1516         sock = socket.socket()
   1517         port = support.bind_port(sock)
   1518         sock.listen()
   1519         self.background = threading.Thread(
   1520             target=self.run_server, args=(sock,))
   1521         self.background.start()
   1522         self.addCleanup(self.background.join)
   1523 
   1524         self.nntp = NNTP(support.HOST, port, usenetrc=False).__enter__()
   1525         self.addCleanup(self.nntp.__exit__, None, None, None)
   1526 
   1527     def run_server(self, sock):
   1528         # Could be generalized to handle more commands in separate methods
   1529         with sock:
   1530             [client, _] = sock.accept()
   1531         with contextlib.ExitStack() as cleanup:
   1532             cleanup.enter_context(client)
   1533             reader = cleanup.enter_context(client.makefile('rb'))
   1534             client.sendall(b'200 Server ready\r\n')
   1535             while True:
   1536                 cmd = reader.readline()
   1537                 if cmd == b'CAPABILITIES\r\n':
   1538                     client.sendall(
   1539                         b'101 Capability list:\r\n'
   1540                         b'VERSION 2\r\n'
   1541                         b'STARTTLS\r\n'
   1542                         b'.\r\n'
   1543                     )
   1544                 elif cmd == b'STARTTLS\r\n':
   1545                     reader.close()
   1546                     client.sendall(b'382 Begin TLS negotiation now\r\n')
   1547                     context = ssl.SSLContext()
   1548                     context.load_cert_chain(certfile)
   1549                     client = context.wrap_socket(
   1550                         client, server_side=True)
   1551                     cleanup.enter_context(client)
   1552                     reader = cleanup.enter_context(client.makefile('rb'))
   1553                 elif cmd == b'QUIT\r\n':
   1554                     client.sendall(b'205 Bye!\r\n')
   1555                     break
   1556                 else:
   1557                     raise ValueError('Unexpected command {!r}'.format(cmd))
   1558 
   1559     @unittest.skipUnless(ssl, 'requires SSL support')
   1560     def test_starttls(self):
   1561         file = self.nntp.file
   1562         sock = self.nntp.sock
   1563         self.nntp.starttls()
   1564         # Check that the socket and internal pseudo-file really were
   1565         # changed.
   1566         self.assertNotEqual(file, self.nntp.file)
   1567         self.assertNotEqual(sock, self.nntp.sock)
   1568         # Check that the new socket really is an SSL one
   1569         self.assertIsInstance(self.nntp.sock, ssl.SSLSocket)
   1570         # Check that trying starttls when it's already active fails.
   1571         self.assertRaises(ValueError, self.nntp.starttls)
   1572 
   1573 
   1574 if __name__ == "__main__":
   1575     unittest.main()
   1576