Home | History | Annotate | Download | only in test
      1 import io
      2 import socket
      3 import datetime
      4 import textwrap
      5 import unittest
      6 import functools
      7 import contextlib
      8 import os.path
      9 import threading
     10 
     11 from test import support
     12 from nntplib import NNTP, GroupInfo
     13 import nntplib
     14 from unittest.mock import patch
     15 try:
     16     import ssl
     17 except ImportError:
     18     ssl = None
     19 
     20 
     21 TIMEOUT = 30
     22 certfile = os.path.join(os.path.dirname(__file__), 'keycert3.pem')
     23 
     24 # TODO:
     25 # - test the `file` arg to more commands
     26 # - test error conditions
     27 # - test auth and `usenetrc`
     28 
     29 
     30 class NetworkedNNTPTestsMixin:
     31 
     32     def test_welcome(self):
     33         welcome = self.server.getwelcome()
     34         self.assertEqual(str, type(welcome))
     35 
     36     def test_help(self):
     37         resp, lines = self.server.help()
     38         self.assertTrue(resp.startswith("100 "), resp)
     39         for line in lines:
     40             self.assertEqual(str, type(line))
     41 
     42     def test_list(self):
     43         resp, groups = self.server.list()
     44         if len(groups) > 0:
     45             self.assertEqual(GroupInfo, type(groups[0]))
     46             self.assertEqual(str, type(groups[0].group))
     47 
     48     def test_list_active(self):
     49         resp, groups = self.server.list(self.GROUP_PAT)
     50         if len(groups) > 0:
     51             self.assertEqual(GroupInfo, type(groups[0]))
     52             self.assertEqual(str, type(groups[0].group))
     53 
     54     def test_unknown_command(self):
     55         with self.assertRaises(nntplib.NNTPPermanentError) as cm:
     56             self.server._shortcmd("XYZZY")
     57         resp = cm.exception.response
     58         self.assertTrue(resp.startswith("500 "), resp)
     59 
     60     def test_newgroups(self):
     61         # gmane gets a constant influx of new groups.  In order not to stress
     62         # the server too much, we choose a recent date in the past.
     63         dt = datetime.date.today() - datetime.timedelta(days=7)
     64         resp, groups = self.server.newgroups(dt)
     65         if len(groups) > 0:
     66             self.assertIsInstance(groups[0], GroupInfo)
     67             self.assertIsInstance(groups[0].group, str)
     68 
     69     def test_description(self):
     70         def _check_desc(desc):
     71             # Sanity checks
     72             self.assertIsInstance(desc, str)
     73             self.assertNotIn(self.GROUP_NAME, desc)
     74         desc = self.server.description(self.GROUP_NAME)
     75         _check_desc(desc)
     76         # Another sanity check
     77         self.assertIn("Python", desc)
     78         # With a pattern
     79         desc = self.server.description(self.GROUP_PAT)
     80         _check_desc(desc)
     81         # Shouldn't exist
     82         desc = self.server.description("zk.brrtt.baz")
     83         self.assertEqual(desc, '')
     84 
     85     def test_descriptions(self):
     86         resp, descs = self.server.descriptions(self.GROUP_PAT)
     87         # 215 for LIST NEWSGROUPS, 282 for XGTITLE
     88         self.assertTrue(
     89             resp.startswith("215 ") or resp.startswith("282 "), resp)
     90         self.assertIsInstance(descs, dict)
     91         desc = descs[self.GROUP_NAME]
     92         self.assertEqual(desc, self.server.description(self.GROUP_NAME))
     93 
     94     def test_group(self):
     95         result = self.server.group(self.GROUP_NAME)
     96         self.assertEqual(5, len(result))
     97         resp, count, first, last, group = result
     98         self.assertEqual(group, self.GROUP_NAME)
     99         self.assertIsInstance(count, int)
    100         self.assertIsInstance(first, int)
    101         self.assertIsInstance(last, int)
    102         self.assertLessEqual(first, last)
    103         self.assertTrue(resp.startswith("211 "), resp)
    104 
    105     def test_date(self):
    106         resp, date = self.server.date()
    107         self.assertIsInstance(date, datetime.datetime)
    108         # Sanity check
    109         self.assertGreaterEqual(date.year, 1995)
    110         self.assertLessEqual(date.year, 2030)
    111 
    112     def _check_art_dict(self, art_dict):
    113         # Some sanity checks for a field dictionary returned by OVER / XOVER
    114         self.assertIsInstance(art_dict, dict)
    115         # NNTP has 7 mandatory fields
    116         self.assertGreaterEqual(art_dict.keys(),
    117             {"subject", "from", "date", "message-id",
    118              "references", ":bytes", ":lines"}
    119             )
    120         for v in art_dict.values():
    121             self.assertIsInstance(v, (str, type(None)))
    122 
    123     def test_xover(self):
    124         resp, count, first, last, name = self.server.group(self.GROUP_NAME)
    125         resp, lines = self.server.xover(last - 5, last)
    126         if len(lines) == 0:
    127             self.skipTest("no articles retrieved")
    128         # The 'last' article is not necessarily part of the output (cancelled?)
    129         art_num, art_dict = lines[0]
    130         self.assertGreaterEqual(art_num, last - 5)
    131         self.assertLessEqual(art_num, last)
    132         self._check_art_dict(art_dict)
    133 
    134     @unittest.skipIf(True, 'temporarily skipped until a permanent solution'
    135                            ' is found for issue #28971')
    136     def test_over(self):
    137         resp, count, first, last, name = self.server.group(self.GROUP_NAME)
    138         start = last - 10
    139         # The "start-" article range form
    140         resp, lines = self.server.over((start, None))
    141         art_num, art_dict = lines[0]
    142         self._check_art_dict(art_dict)
    143         # The "start-end" article range form
    144         resp, lines = self.server.over((start, last))
    145         art_num, art_dict = lines[-1]
    146         # The 'last' article is not necessarily part of the output (cancelled?)
    147         self.assertGreaterEqual(art_num, start)
    148         self.assertLessEqual(art_num, last)
    149         self._check_art_dict(art_dict)
    150         # XXX The "message_id" form is unsupported by gmane
    151         # 503 Overview by message-ID unsupported
    152 
    153     def test_xhdr(self):
    154         resp, count, first, last, name = self.server.group(self.GROUP_NAME)
    155         resp, lines = self.server.xhdr('subject', last)
    156         for line in lines:
    157             self.assertEqual(str, type(line[1]))
    158 
    159     def check_article_resp(self, resp, article, art_num=None):
    160         self.assertIsInstance(article, nntplib.ArticleInfo)
    161         if art_num is not None:
    162             self.assertEqual(article.number, art_num)
    163         for line in article.lines:
    164             self.assertIsInstance(line, bytes)
    165         # XXX this could exceptionally happen...
    166         self.assertNotIn(article.lines[-1], (b".", b".\n", b".\r\n"))
    167 
    168     @unittest.skipIf(True, "FIXME: see bpo-32128")
    169     def test_article_head_body(self):
    170         resp, count, first, last, name = self.server.group(self.GROUP_NAME)
    171         # Try to find an available article
    172         for art_num in (last, first, last - 1):
    173             try:
    174                 resp, head = self.server.head(art_num)
    175             except nntplib.NNTPTemporaryError as e:
    176                 if not e.response.startswith("423 "):
    177                     raise
    178                 # "423 No such article" => choose another one
    179                 continue
    180             break
    181         else:
    182             self.skipTest("could not find a suitable article number")
    183         self.assertTrue(resp.startswith("221 "), resp)
    184         self.check_article_resp(resp, head, art_num)
    185         resp, body = self.server.body(art_num)
    186         self.assertTrue(resp.startswith("222 "), resp)
    187         self.check_article_resp(resp, body, art_num)
    188         resp, article = self.server.article(art_num)
    189         self.assertTrue(resp.startswith("220 "), resp)
    190         self.check_article_resp(resp, article, art_num)
    191         # Tolerate running the tests from behind a NNTP virus checker
    192         blacklist = lambda line: line.startswith(b'X-Antivirus')
    193         filtered_head_lines = [line for line in head.lines
    194                                if not blacklist(line)]
    195         filtered_lines = [line for line in article.lines
    196                           if not blacklist(line)]
    197         self.assertEqual(filtered_lines, filtered_head_lines + [b''] + body.lines)
    198 
    199     def test_capabilities(self):
    200         # The server under test implements NNTP version 2 and has a
    201         # couple of well-known capabilities. Just sanity check that we
    202         # got them.
    203         def _check_caps(caps):
    204             caps_list = caps['LIST']
    205             self.assertIsInstance(caps_list, (list, tuple))
    206             self.assertIn('OVERVIEW.FMT', caps_list)
    207         self.assertGreaterEqual(self.server.nntp_version, 2)
    208         _check_caps(self.server.getcapabilities())
    209         # This re-emits the command
    210         resp, caps = self.server.capabilities()
    211         _check_caps(caps)
    212 
    213     def test_zlogin(self):
    214         # This test must be the penultimate because further commands will be
    215         # refused.
    216         baduser = "notarealuser"
    217         badpw = "notarealpassword"
    218         # Check that bogus credentials cause failure
    219         self.assertRaises(nntplib.NNTPError, self.server.login,
    220                           user=baduser, password=badpw, usenetrc=False)
    221         # FIXME: We should check that correct credentials succeed, but that
    222         # would require valid details for some server somewhere to be in the
    223         # test suite, I think. Gmane is anonymous, at least as used for the
    224         # other tests.
    225 
    226     def test_zzquit(self):
    227         # This test must be called last, hence the name
    228         cls = type(self)
    229         try:
    230             self.server.quit()
    231         finally:
    232             cls.server = None
    233 
    234     @classmethod
    235     def wrap_methods(cls):
    236         # Wrap all methods in a transient_internet() exception catcher
    237         # XXX put a generic version in test.support?
    238         def wrap_meth(meth):
    239             @functools.wraps(meth)
    240             def wrapped(self):
    241                 with support.transient_internet(self.NNTP_HOST):
    242                     meth(self)
    243             return wrapped
    244         for name in dir(cls):
    245             if not name.startswith('test_'):
    246                 continue
    247             meth = getattr(cls, name)
    248             if not callable(meth):
    249                 continue
    250             # Need to use a closure so that meth remains bound to its current
    251             # value
    252             setattr(cls, name, wrap_meth(meth))
    253 
    254     def test_with_statement(self):
    255         def is_connected():
    256             if not hasattr(server, 'file'):
    257                 return False
    258             try:
    259                 server.help()
    260             except (OSError, EOFError):
    261                 return False
    262             return True
    263 
    264         with self.NNTP_CLASS(self.NNTP_HOST, timeout=TIMEOUT, usenetrc=False) as server:
    265             self.assertTrue(is_connected())
    266             self.assertTrue(server.help())
    267         self.assertFalse(is_connected())
    268 
    269         with self.NNTP_CLASS(self.NNTP_HOST, timeout=TIMEOUT, usenetrc=False) as server:
    270             server.quit()
    271         self.assertFalse(is_connected())
    272 
    273 
    274 NetworkedNNTPTestsMixin.wrap_methods()
    275 
    276 
    277 EOF_ERRORS = (EOFError,)
    278 if ssl is not None:
    279     EOF_ERRORS += (ssl.SSLEOFError,)
    280 
    281 
    282 class NetworkedNNTPTests(NetworkedNNTPTestsMixin, unittest.TestCase):
    283     # This server supports STARTTLS (gmane doesn't)
    284     NNTP_HOST = 'news.trigofacile.com'
    285     GROUP_NAME = 'fr.comp.lang.python'
    286     GROUP_PAT = 'fr.comp.lang.*'
    287 
    288     NNTP_CLASS = NNTP
    289 
    290     @classmethod
    291     def setUpClass(cls):
    292         support.requires("network")
    293         with support.transient_internet(cls.NNTP_HOST):
    294             try:
    295                 cls.server = cls.NNTP_CLASS(cls.NNTP_HOST, timeout=TIMEOUT,
    296                                             usenetrc=False)
    297             except EOF_ERRORS:
    298                 raise unittest.SkipTest(f"{cls} got EOF error on connecting "
    299                                         f"to {cls.NNTP_HOST!r}")
    300 
    301     @classmethod
    302     def tearDownClass(cls):
    303         if cls.server is not None:
    304             cls.server.quit()
    305 
    306 @unittest.skipUnless(ssl, 'requires SSL support')
    307 class NetworkedNNTP_SSLTests(NetworkedNNTPTests):
    308 
    309     # Technical limits for this public NNTP server (see http://www.aioe.org):
    310     # "Only two concurrent connections per IP address are allowed and
    311     # 400 connections per day are accepted from each IP address."
    312 
    313     NNTP_HOST = 'nntp.aioe.org'
    314     GROUP_NAME = 'comp.lang.python'
    315     GROUP_PAT = 'comp.lang.*'
    316 
    317     NNTP_CLASS = getattr(nntplib, 'NNTP_SSL', None)
    318 
    319     # Disabled as it produces too much data
    320     test_list = None
    321 
    322     # Disabled as the connection will already be encrypted.
    323     test_starttls = None
    324 
    325 
    326 #
    327 # Non-networked tests using a local server (or something mocking it).
    328 #
    329 
    330 class _NNTPServerIO(io.RawIOBase):
    331     """A raw IO object allowing NNTP commands to be received and processed
    332     by a handler.  The handler can push responses which can then be read
    333     from the IO object."""
    334 
    335     def __init__(self, handler):
    336         io.RawIOBase.__init__(self)
    337         # The channel from the client
    338         self.c2s = io.BytesIO()
    339         # The channel to the client
    340         self.s2c = io.BytesIO()
    341         self.handler = handler
    342         self.handler.start(self.c2s.readline, self.push_data)
    343 
    344     def readable(self):
    345         return True
    346 
    347     def writable(self):
    348         return True
    349 
    350     def push_data(self, data):
    351         """Push (buffer) some data to send to the client."""
    352         pos = self.s2c.tell()
    353         self.s2c.seek(0, 2)
    354         self.s2c.write(data)
    355         self.s2c.seek(pos)
    356 
    357     def write(self, b):
    358         """The client sends us some data"""
    359         pos = self.c2s.tell()
    360         self.c2s.write(b)
    361         self.c2s.seek(pos)
    362         self.handler.process_pending()
    363         return len(b)
    364 
    365     def readinto(self, buf):
    366         """The client wants to read a response"""
    367         self.handler.process_pending()
    368         b = self.s2c.read(len(buf))
    369         n = len(b)
    370         buf[:n] = b
    371         return n
    372 
    373 
    374 def make_mock_file(handler):
    375     sio = _NNTPServerIO(handler)
    376     # Using BufferedRWPair instead of BufferedRandom ensures the file
    377     # isn't seekable.
    378     file = io.BufferedRWPair(sio, sio)
    379     return (sio, file)
    380 
    381 
    382 class MockedNNTPTestsMixin:
    383     # Override in derived classes
    384     handler_class = None
    385 
    386     def setUp(self):
    387         super().setUp()
    388         self.make_server()
    389 
    390     def tearDown(self):
    391         super().tearDown()
    392         del self.server
    393 
    394     def make_server(self, *args, **kwargs):
    395         self.handler = self.handler_class()
    396         self.sio, file = make_mock_file(self.handler)
    397         self.server = nntplib._NNTPBase(file, 'test.server', *args, **kwargs)
    398         return self.server
    399 
    400 
    401 class MockedNNTPWithReaderModeMixin(MockedNNTPTestsMixin):
    402     def setUp(self):
    403         super().setUp()
    404         self.make_server(readermode=True)
    405 
    406 
    407 class NNTPv1Handler:
    408     """A handler for RFC 977"""
    409 
    410     welcome = "200 NNTP mock server"
    411 
    412     def start(self, readline, push_data):
    413         self.in_body = False
    414         self.allow_posting = True
    415         self._readline = readline
    416         self._push_data = push_data
    417         self._logged_in = False
    418         self._user_sent = False
    419         # Our welcome
    420         self.handle_welcome()
    421 
    422     def _decode(self, data):
    423         return str(data, "utf-8", "surrogateescape")
    424 
    425     def process_pending(self):
    426         if self.in_body:
    427             while True:
    428                 line = self._readline()
    429                 if not line:
    430                     return
    431                 self.body.append(line)
    432                 if line == b".\r\n":
    433                     break
    434             try:
    435                 meth, tokens = self.body_callback
    436                 meth(*tokens, body=self.body)
    437             finally:
    438                 self.body_callback = None
    439                 self.body = None
    440                 self.in_body = False
    441         while True:
    442             line = self._decode(self._readline())
    443             if not line:
    444                 return
    445             if not line.endswith("\r\n"):
    446                 raise ValueError("line doesn't end with \\r\\n: {!r}".format(line))
    447             line = line[:-2]
    448             cmd, *tokens = line.split()
    449             #meth = getattr(self.handler, "handle_" + cmd.upper(), None)
    450             meth = getattr(self, "handle_" + cmd.upper(), None)
    451             if meth is None:
    452                 self.handle_unknown()
    453             else:
    454                 try:
    455                     meth(*tokens)
    456                 except Exception as e:
    457                     raise ValueError("command failed: {!r}".format(line)) from e
    458                 else:
    459                     if self.in_body:
    460                         self.body_callback = meth, tokens
    461                         self.body = []
    462 
    463     def expect_body(self):
    464         """Flag that the client is expected to post a request body"""
    465         self.in_body = True
    466 
    467     def push_data(self, data):
    468         """Push some binary data"""
    469         self._push_data(data)
    470 
    471     def push_lit(self, lit):
    472         """Push a string literal"""
    473         lit = textwrap.dedent(lit)
    474         lit = "\r\n".join(lit.splitlines()) + "\r\n"
    475         lit = lit.encode('utf-8')
    476         self.push_data(lit)
    477 
    478     def handle_unknown(self):
    479         self.push_lit("500 What?")
    480 
    481     def handle_welcome(self):
    482         self.push_lit(self.welcome)
    483 
    484     def handle_QUIT(self):
    485         self.push_lit("205 Bye!")
    486 
    487     def handle_DATE(self):
    488         self.push_lit("111 20100914001155")
    489 
    490     def handle_GROUP(self, group):
    491         if group == "fr.comp.lang.python":
    492             self.push_lit("211 486 761 1265 fr.comp.lang.python")
    493         else:
    494             self.push_lit("411 No such group {}".format(group))
    495 
    496     def handle_HELP(self):
    497         self.push_lit("""\
    498             100 Legal commands
    499               authinfo user Name|pass Password|generic <prog> <args>
    500               date
    501               help
    502             Report problems to <root (at] example.org>
    503             .""")
    504 
    505     def handle_STAT(self, message_spec=None):
    506         if message_spec is None:
    507             self.push_lit("412 No newsgroup selected")
    508         elif message_spec == "3000234":
    509             self.push_lit("223 3000234 <45223423 (at] example.com>")
    510         elif message_spec == "<45223423 (at] example.com>":
    511             self.push_lit("223 0 <45223423 (at] example.com>")
    512         else:
    513             self.push_lit("430 No Such Article Found")
    514 
    515     def handle_NEXT(self):
    516         self.push_lit("223 3000237 <668929 (at] example.org> retrieved")
    517 
    518     def handle_LAST(self):
    519         self.push_lit("223 3000234 <45223423 (at] example.com> retrieved")
    520 
    521     def handle_LIST(self, action=None, param=None):
    522         if action is None:
    523             self.push_lit("""\
    524                 215 Newsgroups in form "group high low flags".
    525                 comp.lang.python 0000052340 0000002828 y
    526                 comp.lang.python.announce 0000001153 0000000993 m
    527                 free.it.comp.lang.python 0000000002 0000000002 y
    528                 fr.comp.lang.python 0000001254 0000000760 y
    529                 free.it.comp.lang.python.learner 0000000000 0000000001 y
    530                 tw.bbs.comp.lang.python 0000000304 0000000304 y
    531                 .""")
    532         elif action == "ACTIVE":
    533             if param == "*distutils*":
    534                 self.push_lit("""\
    535                     215 Newsgroups in form "group high low flags"
    536                     gmane.comp.python.distutils.devel 0000014104 0000000001 m
    537                     gmane.comp.python.distutils.cvs 0000000000 0000000001 m
    538                     .""")
    539             else:
    540                 self.push_lit("""\
    541                     215 Newsgroups in form "group high low flags"
    542                     .""")
    543         elif action == "OVERVIEW.FMT":
    544             self.push_lit("""\
    545                 215 Order of fields in overview database.
    546                 Subject:
    547                 From:
    548                 Date:
    549                 Message-ID:
    550                 References:
    551                 Bytes:
    552                 Lines:
    553                 Xref:full
    554                 .""")
    555         elif action == "NEWSGROUPS":
    556             assert param is not None
    557             if param == "comp.lang.python":
    558                 self.push_lit("""\
    559                     215 Descriptions in form "group description".
    560                     comp.lang.python\tThe Python computer language.
    561                     .""")
    562             elif param == "comp.lang.python*":
    563                 self.push_lit("""\
    564                     215 Descriptions in form "group description".
    565                     comp.lang.python.announce\tAnnouncements about the Python language. (Moderated)
    566                     comp.lang.python\tThe Python computer language.
    567                     .""")
    568             else:
    569                 self.push_lit("""\
    570                     215 Descriptions in form "group description".
    571                     .""")
    572         else:
    573             self.push_lit('501 Unknown LIST keyword')
    574 
    575     def handle_NEWNEWS(self, group, date_str, time_str):
    576         # We hard code different return messages depending on passed
    577         # argument and date syntax.
    578         if (group == "comp.lang.python" and date_str == "20100913"
    579             and time_str == "082004"):
    580             # Date was passed in RFC 3977 format (NNTP "v2")
    581             self.push_lit("""\
    582                 230 list of newsarticles (NNTP v2) created after Mon Sep 13 08:20:04 2010 follows
    583                 <a4929a40-6328-491a-aaaf-cb79ed7309a2 (at] q2g2000vbk.googlegroups.com>
    584                 <f30c0419-f549-4218-848f-d7d0131da931 (at] y3g2000vbm.googlegroups.com>
    585                 .""")
    586         elif (group == "comp.lang.python" and date_str == "100913"
    587             and time_str == "082004"):
    588             # Date was passed in RFC 977 format (NNTP "v1")
    589             self.push_lit("""\
    590                 230 list of newsarticles (NNTP v1) created after Mon Sep 13 08:20:04 2010 follows
    591                 <a4929a40-6328-491a-aaaf-cb79ed7309a2 (at] q2g2000vbk.googlegroups.com>
    592                 <f30c0419-f549-4218-848f-d7d0131da931 (at] y3g2000vbm.googlegroups.com>
    593                 .""")
    594         elif (group == 'comp.lang.python' and
    595               date_str in ('20100101', '100101') and
    596               time_str == '090000'):
    597             self.push_lit('too long line' * 3000 +
    598                           '\n.')
    599         else:
    600             self.push_lit("""\
    601                 230 An empty list of newsarticles follows
    602                 .""")
    603         # (Note for experiments: many servers disable NEWNEWS.
    604         #  As of this writing, sicinfo3.epfl.ch doesn't.)
    605 
    606     def handle_XOVER(self, message_spec):
    607         if message_spec == "57-59":
    608             self.push_lit(
    609                 "224 Overview information for 57-58 follows\n"
    610                 "57\tRe: ANN: New Plone book with strong Python (and Zope) themes throughout"
    611                     "\tDoug Hellmann <doug.hellmann-Re5JQEeQqe8AvxtiuMwx3w (at] public.gmane.org>"
    612                     "\tSat, 19 Jun 2010 18:04:08 -0400"
    613                     "\t<4FD05F05-F98B-44DC-8111-C6009C925F0C (at] gmail.com>"
    614                     "\t<hvalf7$ort$1 (at] dough.gmane.org>\t7103\t16"
    615                     "\tXref: news.gmane.org gmane.comp.python.authors:57"
    616                     "\n"
    617                 "58\tLooking for a few good bloggers"
    618                     "\tDoug Hellmann <doug.hellmann-Re5JQEeQqe8AvxtiuMwx3w (at] public.gmane.org>"
    619                     "\tThu, 22 Jul 2010 09:14:14 -0400"
    620                     "\t<A29863FA-F388-40C3-AA25-0FD06B09B5BF (at] gmail.com>"
    621                     "\t\t6683\t16"
    622                     "\t"
    623                     "\n"
    624                 # A UTF-8 overview line from fr.comp.lang.python
    625                 "59\tRe: Message d'erreur incomprhensible (par moi)"
    626                     "\tEric Brunel <eric.brunel (at] pragmadev.nospam.com>"
    627                     "\tWed, 15 Sep 2010 18:09:15 +0200"
    628                     "\t<eric.brunel-2B8B56.18091515092010 (at] news.wanadoo.fr>"
    629                     "\t<4c90ec87$0$32425$ba4acef3@reader.news.orange.fr>\t1641\t27"
    630                     "\tXref: saria.nerim.net fr.comp.lang.python:1265"
    631                     "\n"
    632                 ".\n")
    633         else:
    634             self.push_lit("""\
    635                 224 No articles
    636                 .""")
    637 
    638     def handle_POST(self, *, body=None):
    639         if body is None:
    640             if self.allow_posting:
    641                 self.push_lit("340 Input article; end with <CR-LF>.<CR-LF>")
    642                 self.expect_body()
    643             else:
    644                 self.push_lit("440 Posting not permitted")
    645         else:
    646             assert self.allow_posting
    647             self.push_lit("240 Article received OK")
    648             self.posted_body = body
    649 
    650     def handle_IHAVE(self, message_id, *, body=None):
    651         if body is None:
    652             if (self.allow_posting and
    653                 message_id == "<i.am.an.article.you.will.want (at] example.com>"):
    654                 self.push_lit("335 Send it; end with <CR-LF>.<CR-LF>")
    655                 self.expect_body()
    656             else:
    657                 self.push_lit("435 Article not wanted")
    658         else:
    659             assert self.allow_posting
    660             self.push_lit("235 Article transferred OK")
    661             self.posted_body = body
    662 
    663     sample_head = """\
    664         From: "Demo User" <nobody (at] example.net>
    665         Subject: I am just a test article
    666         Content-Type: text/plain; charset=UTF-8; format=flowed
    667         Message-ID: <i.am.an.article.you.will.want (at] example.com>"""
    668 
    669     sample_body = """\
    670         This is just a test article.
    671         ..Here is a dot-starting line.
    672 
    673         -- Signed by Andr\xe9."""
    674 
    675     sample_article = sample_head + "\n\n" + sample_body
    676 
    677     def handle_ARTICLE(self, message_spec=None):
    678         if message_spec is None:
    679             self.push_lit("220 3000237 <45223423 (at] example.com>")
    680         elif message_spec == "<45223423 (at] example.com>":
    681             self.push_lit("220 0 <45223423 (at] example.com>")
    682         elif message_spec == "3000234":
    683             self.push_lit("220 3000234 <45223423 (at] example.com>")
    684         else:
    685             self.push_lit("430 No Such Article Found")
    686             return
    687         self.push_lit(self.sample_article)
    688         self.push_lit(".")
    689 
    690     def handle_HEAD(self, message_spec=None):
    691         if message_spec is None:
    692             self.push_lit("221 3000237 <45223423 (at] example.com>")
    693         elif message_spec == "<45223423 (at] example.com>":
    694             self.push_lit("221 0 <45223423 (at] example.com>")
    695         elif message_spec == "3000234":
    696             self.push_lit("221 3000234 <45223423 (at] example.com>")
    697         else:
    698             self.push_lit("430 No Such Article Found")
    699             return
    700         self.push_lit(self.sample_head)
    701         self.push_lit(".")
    702 
    703     def handle_BODY(self, message_spec=None):
    704         if message_spec is None:
    705             self.push_lit("222 3000237 <45223423 (at] example.com>")
    706         elif message_spec == "<45223423 (at] example.com>":
    707             self.push_lit("222 0 <45223423 (at] example.com>")
    708         elif message_spec == "3000234":
    709             self.push_lit("222 3000234 <45223423 (at] example.com>")
    710         else:
    711             self.push_lit("430 No Such Article Found")
    712             return
    713         self.push_lit(self.sample_body)
    714         self.push_lit(".")
    715 
    716     def handle_AUTHINFO(self, cred_type, data):
    717         if self._logged_in:
    718             self.push_lit('502 Already Logged In')
    719         elif cred_type == 'user':
    720             if self._user_sent:
    721                 self.push_lit('482 User Credential Already Sent')
    722             else:
    723                 self.push_lit('381 Password Required')
    724                 self._user_sent = True
    725         elif cred_type == 'pass':
    726             self.push_lit('281 Login Successful')
    727             self._logged_in = True
    728         else:
    729             raise Exception('Unknown cred type {}'.format(cred_type))
    730 
    731 
    732 class NNTPv2Handler(NNTPv1Handler):
    733     """A handler for RFC 3977 (NNTP "v2")"""
    734 
    735     def handle_CAPABILITIES(self):
    736         fmt = """\
    737             101 Capability list:
    738             VERSION 2 3
    739             IMPLEMENTATION INN 2.5.1{}
    740             HDR
    741             LIST ACTIVE ACTIVE.TIMES DISTRIB.PATS HEADERS NEWSGROUPS OVERVIEW.FMT
    742             OVER
    743             POST
    744             READER
    745             ."""
    746 
    747         if not self._logged_in:
    748             self.push_lit(fmt.format('\n            AUTHINFO USER'))
    749         else:
    750             self.push_lit(fmt.format(''))
    751 
    752     def handle_MODE(self, _):
    753         raise Exception('MODE READER sent despite READER has been advertised')
    754 
    755     def handle_OVER(self, message_spec=None):
    756         return self.handle_XOVER(message_spec)
    757 
    758 
    759 class CapsAfterLoginNNTPv2Handler(NNTPv2Handler):
    760     """A handler that allows CAPABILITIES only after login"""
    761 
    762     def handle_CAPABILITIES(self):
    763         if not self._logged_in:
    764             self.push_lit('480 You must log in.')
    765         else:
    766             super().handle_CAPABILITIES()
    767 
    768 
    769 class ModeSwitchingNNTPv2Handler(NNTPv2Handler):
    770     """A server that starts in transit mode"""
    771 
    772     def __init__(self):
    773         self._switched = False
    774 
    775     def handle_CAPABILITIES(self):
    776         fmt = """\
    777             101 Capability list:
    778             VERSION 2 3
    779             IMPLEMENTATION INN 2.5.1
    780             HDR
    781             LIST ACTIVE ACTIVE.TIMES DISTRIB.PATS HEADERS NEWSGROUPS OVERVIEW.FMT
    782             OVER
    783             POST
    784             {}READER
    785             ."""
    786         if self._switched:
    787             self.push_lit(fmt.format(''))
    788         else:
    789             self.push_lit(fmt.format('MODE-'))
    790 
    791     def handle_MODE(self, what):
    792         assert not self._switched and what == 'reader'
    793         self._switched = True
    794         self.push_lit('200 Posting allowed')
    795 
    796 
    797 class NNTPv1v2TestsMixin:
    798 
    799     def setUp(self):
    800         super().setUp()
    801 
    802     def test_welcome(self):
    803         self.assertEqual(self.server.welcome, self.handler.welcome)
    804 
    805     def test_authinfo(self):
    806         if self.nntp_version == 2:
    807             self.assertIn('AUTHINFO', self.server._caps)
    808         self.server.login('testuser', 'testpw')
    809         # if AUTHINFO is gone from _caps we also know that getcapabilities()
    810         # has been called after login as it should
    811         self.assertNotIn('AUTHINFO', self.server._caps)
    812 
    813     def test_date(self):
    814         resp, date = self.server.date()
    815         self.assertEqual(resp, "111 20100914001155")
    816         self.assertEqual(date, datetime.datetime(2010, 9, 14, 0, 11, 55))
    817 
    818     def test_quit(self):
    819         self.assertFalse(self.sio.closed)
    820         resp = self.server.quit()
    821         self.assertEqual(resp, "205 Bye!")
    822         self.assertTrue(self.sio.closed)
    823 
    824     def test_help(self):
    825         resp, help = self.server.help()
    826         self.assertEqual(resp, "100 Legal commands")
    827         self.assertEqual(help, [
    828             '  authinfo user Name|pass Password|generic <prog> <args>',
    829             '  date',
    830             '  help',
    831             'Report problems to <root (at] example.org>',
    832         ])
    833 
    834     def test_list(self):
    835         resp, groups = self.server.list()
    836         self.assertEqual(len(groups), 6)
    837         g = groups[1]
    838         self.assertEqual(g,
    839             GroupInfo("comp.lang.python.announce", "0000001153",
    840                       "0000000993", "m"))
    841         resp, groups = self.server.list("*distutils*")
    842         self.assertEqual(len(groups), 2)
    843         g = groups[0]
    844         self.assertEqual(g,
    845             GroupInfo("gmane.comp.python.distutils.devel", "0000014104",
    846                       "0000000001", "m"))
    847 
    848     def test_stat(self):
    849         resp, art_num, message_id = self.server.stat(3000234)
    850         self.assertEqual(resp, "223 3000234 <45223423 (at] example.com>")
    851         self.assertEqual(art_num, 3000234)
    852         self.assertEqual(message_id, "<45223423 (at] example.com>")
    853         resp, art_num, message_id = self.server.stat("<45223423 (at] example.com>")
    854         self.assertEqual(resp, "223 0 <45223423 (at] example.com>")
    855         self.assertEqual(art_num, 0)
    856         self.assertEqual(message_id, "<45223423 (at] example.com>")
    857         with self.assertRaises(nntplib.NNTPTemporaryError) as cm:
    858             self.server.stat("<non.existent.id>")
    859         self.assertEqual(cm.exception.response, "430 No Such Article Found")
    860         with self.assertRaises(nntplib.NNTPTemporaryError) as cm:
    861             self.server.stat()
    862         self.assertEqual(cm.exception.response, "412 No newsgroup selected")
    863 
    864     def test_next(self):
    865         resp, art_num, message_id = self.server.next()
    866         self.assertEqual(resp, "223 3000237 <668929 (at] example.org> retrieved")
    867         self.assertEqual(art_num, 3000237)
    868         self.assertEqual(message_id, "<668929 (at] example.org>")
    869 
    870     def test_last(self):
    871         resp, art_num, message_id = self.server.last()
    872         self.assertEqual(resp, "223 3000234 <45223423 (at] example.com> retrieved")
    873         self.assertEqual(art_num, 3000234)
    874         self.assertEqual(message_id, "<45223423 (at] example.com>")
    875 
    876     def test_description(self):
    877         desc = self.server.description("comp.lang.python")
    878         self.assertEqual(desc, "The Python computer language.")
    879         desc = self.server.description("comp.lang.pythonx")
    880         self.assertEqual(desc, "")
    881 
    882     def test_descriptions(self):
    883         resp, groups = self.server.descriptions("comp.lang.python")
    884         self.assertEqual(resp, '215 Descriptions in form "group description".')
    885         self.assertEqual(groups, {
    886             "comp.lang.python": "The Python computer language.",
    887             })
    888         resp, groups = self.server.descriptions("comp.lang.python*")
    889         self.assertEqual(groups, {
    890             "comp.lang.python": "The Python computer language.",
    891             "comp.lang.python.announce": "Announcements about the Python language. (Moderated)",
    892             })
    893         resp, groups = self.server.descriptions("comp.lang.pythonx")
    894         self.assertEqual(groups, {})
    895 
    896     def test_group(self):
    897         resp, count, first, last, group = self.server.group("fr.comp.lang.python")
    898         self.assertTrue(resp.startswith("211 "), resp)
    899         self.assertEqual(first, 761)
    900         self.assertEqual(last, 1265)
    901         self.assertEqual(count, 486)
    902         self.assertEqual(group, "fr.comp.lang.python")
    903         with self.assertRaises(nntplib.NNTPTemporaryError) as cm:
    904             self.server.group("comp.lang.python.devel")
    905         exc = cm.exception
    906         self.assertTrue(exc.response.startswith("411 No such group"),
    907                         exc.response)
    908 
    909     def test_newnews(self):
    910         # NEWNEWS comp.lang.python [20]100913 082004
    911         dt = datetime.datetime(2010, 9, 13, 8, 20, 4)
    912         resp, ids = self.server.newnews("comp.lang.python", dt)
    913         expected = (
    914             "230 list of newsarticles (NNTP v{0}) "
    915             "created after Mon Sep 13 08:20:04 2010 follows"
    916             ).format(self.nntp_version)
    917         self.assertEqual(resp, expected)
    918         self.assertEqual(ids, [
    919             "<a4929a40-6328-491a-aaaf-cb79ed7309a2 (at] q2g2000vbk.googlegroups.com>",
    920             "<f30c0419-f549-4218-848f-d7d0131da931 (at] y3g2000vbm.googlegroups.com>",
    921             ])
    922         # NEWNEWS fr.comp.lang.python [20]100913 082004
    923         dt = datetime.datetime(2010, 9, 13, 8, 20, 4)
    924         resp, ids = self.server.newnews("fr.comp.lang.python", dt)
    925         self.assertEqual(resp, "230 An empty list of newsarticles follows")
    926         self.assertEqual(ids, [])
    927 
    928     def _check_article_body(self, lines):
    929         self.assertEqual(len(lines), 4)
    930         self.assertEqual(lines[-1].decode('utf-8'), "-- Signed by Andr.")
    931         self.assertEqual(lines[-2], b"")
    932         self.assertEqual(lines[-3], b".Here is a dot-starting line.")
    933         self.assertEqual(lines[-4], b"This is just a test article.")
    934 
    935     def _check_article_head(self, lines):
    936         self.assertEqual(len(lines), 4)
    937         self.assertEqual(lines[0], b'From: "Demo User" <nobody (at] example.net>')
    938         self.assertEqual(lines[3], b"Message-ID: <i.am.an.article.you.will.want (at] example.com>")
    939 
    940     def _check_article_data(self, lines):
    941         self.assertEqual(len(lines), 9)
    942         self._check_article_head(lines[:4])
    943         self._check_article_body(lines[-4:])
    944         self.assertEqual(lines[4], b"")
    945 
    946     def test_article(self):
    947         # ARTICLE
    948         resp, info = self.server.article()
    949         self.assertEqual(resp, "220 3000237 <45223423 (at] example.com>")
    950         art_num, message_id, lines = info
    951         self.assertEqual(art_num, 3000237)
    952         self.assertEqual(message_id, "<45223423 (at] example.com>")
    953         self._check_article_data(lines)
    954         # ARTICLE num
    955         resp, info = self.server.article(3000234)
    956         self.assertEqual(resp, "220 3000234 <45223423 (at] example.com>")
    957         art_num, message_id, lines = info
    958         self.assertEqual(art_num, 3000234)
    959         self.assertEqual(message_id, "<45223423 (at] example.com>")
    960         self._check_article_data(lines)
    961         # ARTICLE id
    962         resp, info = self.server.article("<45223423 (at] example.com>")
    963         self.assertEqual(resp, "220 0 <45223423 (at] example.com>")
    964         art_num, message_id, lines = info
    965         self.assertEqual(art_num, 0)
    966         self.assertEqual(message_id, "<45223423 (at] example.com>")
    967         self._check_article_data(lines)
    968         # Non-existent id
    969         with self.assertRaises(nntplib.NNTPTemporaryError) as cm:
    970             self.server.article("<non-existent (at] example.com>")
    971         self.assertEqual(cm.exception.response, "430 No Such Article Found")
    972 
    973     def test_article_file(self):
    974         # With a "file" argument
    975         f = io.BytesIO()
    976         resp, info = self.server.article(file=f)
    977         self.assertEqual(resp, "220 3000237 <45223423 (at] example.com>")
    978         art_num, message_id, lines = info
    979         self.assertEqual(art_num, 3000237)
    980         self.assertEqual(message_id, "<45223423 (at] example.com>")
    981         self.assertEqual(lines, [])
    982         data = f.getvalue()
    983         self.assertTrue(data.startswith(
    984             b'From: "Demo User" <nobody (at] example.net>\r\n'
    985             b'Subject: I am just a test article\r\n'
    986             ), ascii(data))
    987         self.assertTrue(data.endswith(
    988             b'This is just a test article.\r\n'
    989             b'.Here is a dot-starting line.\r\n'
    990             b'\r\n'
    991             b'-- Signed by Andr\xc3\xa9.\r\n'
    992             ), ascii(data))
    993 
    994     def test_head(self):
    995         # HEAD
    996         resp, info = self.server.head()
    997         self.assertEqual(resp, "221 3000237 <45223423 (at] example.com>")
    998         art_num, message_id, lines = info
    999         self.assertEqual(art_num, 3000237)
   1000         self.assertEqual(message_id, "<45223423 (at] example.com>")
   1001         self._check_article_head(lines)
   1002         # HEAD num
   1003         resp, info = self.server.head(3000234)
   1004         self.assertEqual(resp, "221 3000234 <45223423 (at] example.com>")
   1005         art_num, message_id, lines = info
   1006         self.assertEqual(art_num, 3000234)
   1007         self.assertEqual(message_id, "<45223423 (at] example.com>")
   1008         self._check_article_head(lines)
   1009         # HEAD id
   1010         resp, info = self.server.head("<45223423 (at] example.com>")
   1011         self.assertEqual(resp, "221 0 <45223423 (at] example.com>")
   1012         art_num, message_id, lines = info
   1013         self.assertEqual(art_num, 0)
   1014         self.assertEqual(message_id, "<45223423 (at] example.com>")
   1015         self._check_article_head(lines)
   1016         # Non-existent id
   1017         with self.assertRaises(nntplib.NNTPTemporaryError) as cm:
   1018             self.server.head("<non-existent (at] example.com>")
   1019         self.assertEqual(cm.exception.response, "430 No Such Article Found")
   1020 
   1021     def test_head_file(self):
   1022         f = io.BytesIO()
   1023         resp, info = self.server.head(file=f)
   1024         self.assertEqual(resp, "221 3000237 <45223423 (at] example.com>")
   1025         art_num, message_id, lines = info
   1026         self.assertEqual(art_num, 3000237)
   1027         self.assertEqual(message_id, "<45223423 (at] example.com>")
   1028         self.assertEqual(lines, [])
   1029         data = f.getvalue()
   1030         self.assertTrue(data.startswith(
   1031             b'From: "Demo User" <nobody (at] example.net>\r\n'
   1032             b'Subject: I am just a test article\r\n'
   1033             ), ascii(data))
   1034         self.assertFalse(data.endswith(
   1035             b'This is just a test article.\r\n'
   1036             b'.Here is a dot-starting line.\r\n'
   1037             b'\r\n'
   1038             b'-- Signed by Andr\xc3\xa9.\r\n'
   1039             ), ascii(data))
   1040 
   1041     def test_body(self):
   1042         # BODY
   1043         resp, info = self.server.body()
   1044         self.assertEqual(resp, "222 3000237 <45223423 (at] example.com>")
   1045         art_num, message_id, lines = info
   1046         self.assertEqual(art_num, 3000237)
   1047         self.assertEqual(message_id, "<45223423 (at] example.com>")
   1048         self._check_article_body(lines)
   1049         # BODY num
   1050         resp, info = self.server.body(3000234)
   1051         self.assertEqual(resp, "222 3000234 <45223423 (at] example.com>")
   1052         art_num, message_id, lines = info
   1053         self.assertEqual(art_num, 3000234)
   1054         self.assertEqual(message_id, "<45223423 (at] example.com>")
   1055         self._check_article_body(lines)
   1056         # BODY id
   1057         resp, info = self.server.body("<45223423 (at] example.com>")
   1058         self.assertEqual(resp, "222 0 <45223423 (at] example.com>")
   1059         art_num, message_id, lines = info
   1060         self.assertEqual(art_num, 0)
   1061         self.assertEqual(message_id, "<45223423 (at] example.com>")
   1062         self._check_article_body(lines)
   1063         # Non-existent id
   1064         with self.assertRaises(nntplib.NNTPTemporaryError) as cm:
   1065             self.server.body("<non-existent (at] example.com>")
   1066         self.assertEqual(cm.exception.response, "430 No Such Article Found")
   1067 
   1068     def test_body_file(self):
   1069         f = io.BytesIO()
   1070         resp, info = self.server.body(file=f)
   1071         self.assertEqual(resp, "222 3000237 <45223423 (at] example.com>")
   1072         art_num, message_id, lines = info
   1073         self.assertEqual(art_num, 3000237)
   1074         self.assertEqual(message_id, "<45223423 (at] example.com>")
   1075         self.assertEqual(lines, [])
   1076         data = f.getvalue()
   1077         self.assertFalse(data.startswith(
   1078             b'From: "Demo User" <nobody (at] example.net>\r\n'
   1079             b'Subject: I am just a test article\r\n'
   1080             ), ascii(data))
   1081         self.assertTrue(data.endswith(
   1082             b'This is just a test article.\r\n'
   1083             b'.Here is a dot-starting line.\r\n'
   1084             b'\r\n'
   1085             b'-- Signed by Andr\xc3\xa9.\r\n'
   1086             ), ascii(data))
   1087 
   1088     def check_over_xover_resp(self, resp, overviews):
   1089         self.assertTrue(resp.startswith("224 "), resp)
   1090         self.assertEqual(len(overviews), 3)
   1091         art_num, over = overviews[0]
   1092         self.assertEqual(art_num, 57)
   1093         self.assertEqual(over, {
   1094             "from": "Doug Hellmann <doug.hellmann-Re5JQEeQqe8AvxtiuMwx3w (at] public.gmane.org>",
   1095             "subject": "Re: ANN: New Plone book with strong Python (and Zope) themes throughout",
   1096             "date": "Sat, 19 Jun 2010 18:04:08 -0400",
   1097             "message-id": "<4FD05F05-F98B-44DC-8111-C6009C925F0C (at] gmail.com>",
   1098             "references": "<hvalf7$ort$1 (at] dough.gmane.org>",
   1099             ":bytes": "7103",
   1100             ":lines": "16",
   1101             "xref": "news.gmane.org gmane.comp.python.authors:57"
   1102             })
   1103         art_num, over = overviews[1]
   1104         self.assertEqual(over["xref"], None)
   1105         art_num, over = overviews[2]
   1106         self.assertEqual(over["subject"],
   1107                          "Re: Message d'erreur incomprhensible (par moi)")
   1108 
   1109     def test_xover(self):
   1110         resp, overviews = self.server.xover(57, 59)
   1111         self.check_over_xover_resp(resp, overviews)
   1112 
   1113     def test_over(self):
   1114         # In NNTP "v1", this will fallback on XOVER
   1115         resp, overviews = self.server.over((57, 59))
   1116         self.check_over_xover_resp(resp, overviews)
   1117 
   1118     sample_post = (
   1119         b'From: "Demo User" <nobody (at] example.net>\r\n'
   1120         b'Subject: I am just a test article\r\n'
   1121         b'Content-Type: text/plain; charset=UTF-8; format=flowed\r\n'
   1122         b'Message-ID: <i.am.an.article.you.will.want (at] example.com>\r\n'
   1123         b'\r\n'
   1124         b'This is just a test article.\r\n'
   1125         b'.Here is a dot-starting line.\r\n'
   1126         b'\r\n'
   1127         b'-- Signed by Andr\xc3\xa9.\r\n'
   1128     )
   1129 
   1130     def _check_posted_body(self):
   1131         # Check the raw body as received by the server
   1132         lines = self.handler.posted_body
   1133         # One additional line for the "." terminator
   1134         self.assertEqual(len(lines), 10)
   1135         self.assertEqual(lines[-1], b'.\r\n')
   1136         self.assertEqual(lines[-2], b'-- Signed by Andr\xc3\xa9.\r\n')
   1137         self.assertEqual(lines[-3], b'\r\n')
   1138         self.assertEqual(lines[-4], b'..Here is a dot-starting line.\r\n')
   1139         self.assertEqual(lines[0], b'From: "Demo User" <nobody (at] example.net>\r\n')
   1140 
   1141     def _check_post_ihave_sub(self, func, *args, file_factory):
   1142         # First the prepared post with CRLF endings
   1143         post = self.sample_post
   1144         func_args = args + (file_factory(post),)
   1145         self.handler.posted_body = None
   1146         resp = func(*func_args)
   1147         self._check_posted_body()
   1148         # Then the same post with "normal" line endings - they should be
   1149         # converted by NNTP.post and NNTP.ihave.
   1150         post = self.sample_post.replace(b"\r\n", b"\n")
   1151         func_args = args + (file_factory(post),)
   1152         self.handler.posted_body = None
   1153         resp = func(*func_args)
   1154         self._check_posted_body()
   1155         return resp
   1156 
   1157     def check_post_ihave(self, func, success_resp, *args):
   1158         # With a bytes object
   1159         resp = self._check_post_ihave_sub(func, *args, file_factory=bytes)
   1160         self.assertEqual(resp, success_resp)
   1161         # With a bytearray object
   1162         resp = self._check_post_ihave_sub(func, *args, file_factory=bytearray)
   1163         self.assertEqual(resp, success_resp)
   1164         # With a file object
   1165         resp = self._check_post_ihave_sub(func, *args, file_factory=io.BytesIO)
   1166         self.assertEqual(resp, success_resp)
   1167         # With an iterable of terminated lines
   1168         def iterlines(b):
   1169             return iter(b.splitlines(keepends=True))
   1170         resp = self._check_post_ihave_sub(func, *args, file_factory=iterlines)
   1171         self.assertEqual(resp, success_resp)
   1172         # With an iterable of non-terminated lines
   1173         def iterlines(b):
   1174             return iter(b.splitlines(keepends=False))
   1175         resp = self._check_post_ihave_sub(func, *args, file_factory=iterlines)
   1176         self.assertEqual(resp, success_resp)
   1177 
   1178     def test_post(self):
   1179         self.check_post_ihave(self.server.post, "240 Article received OK")
   1180         self.handler.allow_posting = False
   1181         with self.assertRaises(nntplib.NNTPTemporaryError) as cm:
   1182             self.server.post(self.sample_post)
   1183         self.assertEqual(cm.exception.response,
   1184                          "440 Posting not permitted")
   1185 
   1186     def test_ihave(self):
   1187         self.check_post_ihave(self.server.ihave, "235 Article transferred OK",
   1188                               "<i.am.an.article.you.will.want (at] example.com>")
   1189         with self.assertRaises(nntplib.NNTPTemporaryError) as cm:
   1190             self.server.ihave("<another.message.id>", self.sample_post)
   1191         self.assertEqual(cm.exception.response,
   1192                          "435 Article not wanted")
   1193 
   1194     def test_too_long_lines(self):
   1195         dt = datetime.datetime(2010, 1, 1, 9, 0, 0)
   1196         self.assertRaises(nntplib.NNTPDataError,
   1197                           self.server.newnews, "comp.lang.python", dt)
   1198 
   1199 
   1200 class NNTPv1Tests(NNTPv1v2TestsMixin, MockedNNTPTestsMixin, unittest.TestCase):
   1201     """Tests an NNTP v1 server (no capabilities)."""
   1202 
   1203     nntp_version = 1
   1204     handler_class = NNTPv1Handler
   1205 
   1206     def test_caps(self):
   1207         caps = self.server.getcapabilities()
   1208         self.assertEqual(caps, {})
   1209         self.assertEqual(self.server.nntp_version, 1)
   1210         self.assertEqual(self.server.nntp_implementation, None)
   1211 
   1212 
   1213 class NNTPv2Tests(NNTPv1v2TestsMixin, MockedNNTPTestsMixin, unittest.TestCase):
   1214     """Tests an NNTP v2 server (with capabilities)."""
   1215 
   1216     nntp_version = 2
   1217     handler_class = NNTPv2Handler
   1218 
   1219     def test_caps(self):
   1220         caps = self.server.getcapabilities()
   1221         self.assertEqual(caps, {
   1222             'VERSION': ['2', '3'],
   1223             'IMPLEMENTATION': ['INN', '2.5.1'],
   1224             'AUTHINFO': ['USER'],
   1225             'HDR': [],
   1226             'LIST': ['ACTIVE', 'ACTIVE.TIMES', 'DISTRIB.PATS',
   1227                      'HEADERS', 'NEWSGROUPS', 'OVERVIEW.FMT'],
   1228             'OVER': [],
   1229             'POST': [],
   1230             'READER': [],
   1231             })
   1232         self.assertEqual(self.server.nntp_version, 3)
   1233         self.assertEqual(self.server.nntp_implementation, 'INN 2.5.1')
   1234 
   1235 
   1236 class CapsAfterLoginNNTPv2Tests(MockedNNTPTestsMixin, unittest.TestCase):
   1237     """Tests a probably NNTP v2 server with capabilities only after login."""
   1238 
   1239     nntp_version = 2
   1240     handler_class = CapsAfterLoginNNTPv2Handler
   1241 
   1242     def test_caps_only_after_login(self):
   1243         self.assertEqual(self.server._caps, {})
   1244         self.server.login('testuser', 'testpw')
   1245         self.assertIn('VERSION', self.server._caps)
   1246 
   1247 
   1248 class SendReaderNNTPv2Tests(MockedNNTPWithReaderModeMixin,
   1249         unittest.TestCase):
   1250     """Same tests as for v2 but we tell NTTP to send MODE READER to a server
   1251     that isn't in READER mode by default."""
   1252 
   1253     nntp_version = 2
   1254     handler_class = ModeSwitchingNNTPv2Handler
   1255 
   1256     def test_we_are_in_reader_mode_after_connect(self):
   1257         self.assertIn('READER', self.server._caps)
   1258 
   1259 
   1260 class MiscTests(unittest.TestCase):
   1261 
   1262     def test_decode_header(self):
   1263         def gives(a, b):
   1264             self.assertEqual(nntplib.decode_header(a), b)
   1265         gives("" , "")
   1266         gives("a plain header", "a plain header")
   1267         gives(" with extra  spaces ", " with extra  spaces ")
   1268         gives("=?ISO-8859-15?Q?D=E9buter_en_Python?=", "Dbuter en Python")
   1269         gives("=?utf-8?q?Re=3A_=5Bsqlite=5D_probl=C3=A8me_avec_ORDER_BY_sur_des_cha?="
   1270               " =?utf-8?q?=C3=AEnes_de_caract=C3=A8res_accentu=C3=A9es?=",
   1271               "Re: [sqlite] problme avec ORDER BY sur des chanes de caractres accentues")
   1272         gives("Re: =?UTF-8?B?cHJvYmzDqG1lIGRlIG1hdHJpY2U=?=",
   1273               "Re: problme de matrice")
   1274         # A natively utf-8 header (found in the real world!)
   1275         gives("Re: Message d'erreur incomprhensible (par moi)",
   1276               "Re: Message d'erreur incomprhensible (par moi)")
   1277 
   1278     def test_parse_overview_fmt(self):
   1279         # The minimal (default) response
   1280         lines = ["Subject:", "From:", "Date:", "Message-ID:",
   1281                  "References:", ":bytes", ":lines"]
   1282         self.assertEqual(nntplib._parse_overview_fmt(lines),
   1283             ["subject", "from", "date", "message-id", "references",
   1284              ":bytes", ":lines"])
   1285         # The minimal response using alternative names
   1286         lines = ["Subject:", "From:", "Date:", "Message-ID:",
   1287                  "References:", "Bytes:", "Lines:"]
   1288         self.assertEqual(nntplib._parse_overview_fmt(lines),
   1289             ["subject", "from", "date", "message-id", "references",
   1290              ":bytes", ":lines"])
   1291         # Variations in casing
   1292         lines = ["subject:", "FROM:", "DaTe:", "message-ID:",
   1293                  "References:", "BYTES:", "Lines:"]
   1294         self.assertEqual(nntplib._parse_overview_fmt(lines),
   1295             ["subject", "from", "date", "message-id", "references",
   1296              ":bytes", ":lines"])
   1297         # First example from RFC 3977
   1298         lines = ["Subject:", "From:", "Date:", "Message-ID:",
   1299                  "References:", ":bytes", ":lines", "Xref:full",
   1300                  "Distribution:full"]
   1301         self.assertEqual(nntplib._parse_overview_fmt(lines),
   1302             ["subject", "from", "date", "message-id", "references",
   1303              ":bytes", ":lines", "xref", "distribution"])
   1304         # Second example from RFC 3977
   1305         lines = ["Subject:", "From:", "Date:", "Message-ID:",
   1306                  "References:", "Bytes:", "Lines:", "Xref:FULL",
   1307                  "Distribution:FULL"]
   1308         self.assertEqual(nntplib._parse_overview_fmt(lines),
   1309             ["subject", "from", "date", "message-id", "references",
   1310              ":bytes", ":lines", "xref", "distribution"])
   1311         # A classic response from INN
   1312         lines = ["Subject:", "From:", "Date:", "Message-ID:",
   1313                  "References:", "Bytes:", "Lines:", "Xref:full"]
   1314         self.assertEqual(nntplib._parse_overview_fmt(lines),
   1315             ["subject", "from", "date", "message-id", "references",
   1316              ":bytes", ":lines", "xref"])
   1317 
   1318     def test_parse_overview(self):
   1319         fmt = nntplib._DEFAULT_OVERVIEW_FMT + ["xref"]
   1320         # First example from RFC 3977
   1321         lines = [
   1322             '3000234\tI am just a test article\t"Demo User" '
   1323             '<nobody (at] example.com>\t6 Oct 1998 04:38:40 -0500\t'
   1324             '<45223423 (at] example.com>\t<45454 (at] example.net>\t1234\t'
   1325             '17\tXref: news.example.com misc.test:3000363',
   1326         ]
   1327         overview = nntplib._parse_overview(lines, fmt)
   1328         (art_num, fields), = overview
   1329         self.assertEqual(art_num, 3000234)
   1330         self.assertEqual(fields, {
   1331             'subject': 'I am just a test article',
   1332             'from': '"Demo User" <nobody (at] example.com>',
   1333             'date': '6 Oct 1998 04:38:40 -0500',
   1334             'message-id': '<45223423 (at] example.com>',
   1335             'references': '<45454 (at] example.net>',
   1336             ':bytes': '1234',
   1337             ':lines': '17',
   1338             'xref': 'news.example.com misc.test:3000363',
   1339         })
   1340         # Second example; here the "Xref" field is totally absent (including
   1341         # the header name) and comes out as None
   1342         lines = [
   1343             '3000234\tI am just a test article\t"Demo User" '
   1344             '<nobody (at] example.com>\t6 Oct 1998 04:38:40 -0500\t'
   1345             '<45223423 (at] example.com>\t<45454 (at] example.net>\t1234\t'
   1346             '17\t\t',
   1347         ]
   1348         overview = nntplib._parse_overview(lines, fmt)
   1349         (art_num, fields), = overview
   1350         self.assertEqual(fields['xref'], None)
   1351         # Third example; the "Xref" is an empty string, while "references"
   1352         # is a single space.
   1353         lines = [
   1354             '3000234\tI am just a test article\t"Demo User" '
   1355             '<nobody (at] example.com>\t6 Oct 1998 04:38:40 -0500\t'
   1356             '<45223423 (at] example.com>\t \t1234\t'
   1357             '17\tXref: \t',
   1358         ]
   1359         overview = nntplib._parse_overview(lines, fmt)
   1360         (art_num, fields), = overview
   1361         self.assertEqual(fields['references'], ' ')
   1362         self.assertEqual(fields['xref'], '')
   1363 
   1364     def test_parse_datetime(self):
   1365         def gives(a, b, *c):
   1366             self.assertEqual(nntplib._parse_datetime(a, b),
   1367                              datetime.datetime(*c))
   1368         # Output of DATE command
   1369         gives("19990623135624", None, 1999, 6, 23, 13, 56, 24)
   1370         # Variations
   1371         gives("19990623", "135624", 1999, 6, 23, 13, 56, 24)
   1372         gives("990623", "135624", 1999, 6, 23, 13, 56, 24)
   1373         gives("090623", "135624", 2009, 6, 23, 13, 56, 24)
   1374 
   1375     def test_unparse_datetime(self):
   1376         # Test non-legacy mode
   1377         # 1) with a datetime
   1378         def gives(y, M, d, h, m, s, date_str, time_str):
   1379             dt = datetime.datetime(y, M, d, h, m, s)
   1380             self.assertEqual(nntplib._unparse_datetime(dt),
   1381                              (date_str, time_str))
   1382             self.assertEqual(nntplib._unparse_datetime(dt, False),
   1383                              (date_str, time_str))
   1384         gives(1999, 6, 23, 13, 56, 24, "19990623", "135624")
   1385         gives(2000, 6, 23, 13, 56, 24, "20000623", "135624")
   1386         gives(2010, 6, 5, 1, 2, 3, "20100605", "010203")
   1387         # 2) with a date
   1388         def gives(y, M, d, date_str, time_str):
   1389             dt = datetime.date(y, M, d)
   1390             self.assertEqual(nntplib._unparse_datetime(dt),
   1391                              (date_str, time_str))
   1392             self.assertEqual(nntplib._unparse_datetime(dt, False),
   1393                              (date_str, time_str))
   1394         gives(1999, 6, 23, "19990623", "000000")
   1395         gives(2000, 6, 23, "20000623", "000000")
   1396         gives(2010, 6, 5, "20100605", "000000")
   1397 
   1398     def test_unparse_datetime_legacy(self):
   1399         # Test legacy mode (RFC 977)
   1400         # 1) with a datetime
   1401         def gives(y, M, d, h, m, s, date_str, time_str):
   1402             dt = datetime.datetime(y, M, d, h, m, s)
   1403             self.assertEqual(nntplib._unparse_datetime(dt, True),
   1404                              (date_str, time_str))
   1405         gives(1999, 6, 23, 13, 56, 24, "990623", "135624")
   1406         gives(2000, 6, 23, 13, 56, 24, "000623", "135624")
   1407         gives(2010, 6, 5, 1, 2, 3, "100605", "010203")
   1408         # 2) with a date
   1409         def gives(y, M, d, date_str, time_str):
   1410             dt = datetime.date(y, M, d)
   1411             self.assertEqual(nntplib._unparse_datetime(dt, True),
   1412                              (date_str, time_str))
   1413         gives(1999, 6, 23, "990623", "000000")
   1414         gives(2000, 6, 23, "000623", "000000")
   1415         gives(2010, 6, 5, "100605", "000000")
   1416 
   1417     @unittest.skipUnless(ssl, 'requires SSL support')
   1418     def test_ssl_support(self):
   1419         self.assertTrue(hasattr(nntplib, 'NNTP_SSL'))
   1420 
   1421 
   1422 class PublicAPITests(unittest.TestCase):
   1423     """Ensures that the correct values are exposed in the public API."""
   1424 
   1425     def test_module_all_attribute(self):
   1426         self.assertTrue(hasattr(nntplib, '__all__'))
   1427         target_api = ['NNTP', 'NNTPError', 'NNTPReplyError',
   1428                       'NNTPTemporaryError', 'NNTPPermanentError',
   1429                       'NNTPProtocolError', 'NNTPDataError', 'decode_header']
   1430         if ssl is not None:
   1431             target_api.append('NNTP_SSL')
   1432         self.assertEqual(set(nntplib.__all__), set(target_api))
   1433 
   1434 class MockSocketTests(unittest.TestCase):
   1435     """Tests involving a mock socket object
   1436 
   1437     Used where the _NNTPServerIO file object is not enough."""
   1438 
   1439     nntp_class = nntplib.NNTP
   1440 
   1441     def check_constructor_error_conditions(
   1442             self, handler_class,
   1443             expected_error_type, expected_error_msg,
   1444             login=None, password=None):
   1445 
   1446         class mock_socket_module:
   1447             def create_connection(address, timeout):
   1448                 return MockSocket()
   1449 
   1450         class MockSocket:
   1451             def close(self):
   1452                 nonlocal socket_closed
   1453                 socket_closed = True
   1454 
   1455             def makefile(socket, mode):
   1456                 handler = handler_class()
   1457                 _, file = make_mock_file(handler)
   1458                 files.append(file)
   1459                 return file
   1460 
   1461         socket_closed = False
   1462         files = []
   1463         with patch('nntplib.socket', mock_socket_module), \
   1464              self.assertRaisesRegex(expected_error_type, expected_error_msg):
   1465             self.nntp_class('dummy', user=login, password=password)
   1466         self.assertTrue(socket_closed)
   1467         for f in files:
   1468             self.assertTrue(f.closed)
   1469 
   1470     def test_bad_welcome(self):
   1471         #Test a bad welcome message
   1472         class Handler(NNTPv1Handler):
   1473             welcome = 'Bad Welcome'
   1474         self.check_constructor_error_conditions(
   1475             Handler, nntplib.NNTPProtocolError, Handler.welcome)
   1476 
   1477     def test_service_temporarily_unavailable(self):
   1478         #Test service temporarily unavailable
   1479         class Handler(NNTPv1Handler):
   1480             welcome = '400 Service temporarily unavailable'
   1481         self.check_constructor_error_conditions(
   1482             Handler, nntplib.NNTPTemporaryError, Handler.welcome)
   1483 
   1484     def test_service_permanently_unavailable(self):
   1485         #Test service permanently unavailable
   1486         class Handler(NNTPv1Handler):
   1487             welcome = '502 Service permanently unavailable'
   1488         self.check_constructor_error_conditions(
   1489             Handler, nntplib.NNTPPermanentError, Handler.welcome)
   1490 
   1491     def test_bad_capabilities(self):
   1492         #Test a bad capabilities response
   1493         class Handler(NNTPv1Handler):
   1494             def handle_CAPABILITIES(self):
   1495                 self.push_lit(capabilities_response)
   1496         capabilities_response = '201 bad capability'
   1497         self.check_constructor_error_conditions(
   1498             Handler, nntplib.NNTPReplyError, capabilities_response)
   1499 
   1500     def test_login_aborted(self):
   1501         #Test a bad authinfo response
   1502         login = 't (at] e.com'
   1503         password = 'python'
   1504         class Handler(NNTPv1Handler):
   1505             def handle_AUTHINFO(self, *args):
   1506                 self.push_lit(authinfo_response)
   1507         authinfo_response = '503 Mechanism not recognized'
   1508         self.check_constructor_error_conditions(
   1509             Handler, nntplib.NNTPPermanentError, authinfo_response,
   1510             login, password)
   1511 
   1512 class bypass_context:
   1513     """Bypass encryption and actual SSL module"""
   1514     def wrap_socket(sock, **args):
   1515         return sock
   1516 
   1517 @unittest.skipUnless(ssl, 'requires SSL support')
   1518 class MockSslTests(MockSocketTests):
   1519     @staticmethod
   1520     def nntp_class(*pos, **kw):
   1521         return nntplib.NNTP_SSL(*pos, ssl_context=bypass_context, **kw)
   1522 
   1523 
   1524 class LocalServerTests(unittest.TestCase):
   1525     def setUp(self):
   1526         sock = socket.socket()
   1527         port = support.bind_port(sock)
   1528         sock.listen()
   1529         self.background = threading.Thread(
   1530             target=self.run_server, args=(sock,))
   1531         self.background.start()
   1532         self.addCleanup(self.background.join)
   1533 
   1534         self.nntp = NNTP(support.HOST, port, usenetrc=False).__enter__()
   1535         self.addCleanup(self.nntp.__exit__, None, None, None)
   1536 
   1537     def run_server(self, sock):
   1538         # Could be generalized to handle more commands in separate methods
   1539         with sock:
   1540             [client, _] = sock.accept()
   1541         with contextlib.ExitStack() as cleanup:
   1542             cleanup.enter_context(client)
   1543             reader = cleanup.enter_context(client.makefile('rb'))
   1544             client.sendall(b'200 Server ready\r\n')
   1545             while True:
   1546                 cmd = reader.readline()
   1547                 if cmd == b'CAPABILITIES\r\n':
   1548                     client.sendall(
   1549                         b'101 Capability list:\r\n'
   1550                         b'VERSION 2\r\n'
   1551                         b'STARTTLS\r\n'
   1552                         b'.\r\n'
   1553                     )
   1554                 elif cmd == b'STARTTLS\r\n':
   1555                     reader.close()
   1556                     client.sendall(b'382 Begin TLS negotiation now\r\n')
   1557                     context = ssl.SSLContext()
   1558                     context.load_cert_chain(certfile)
   1559                     client = context.wrap_socket(
   1560                         client, server_side=True)
   1561                     cleanup.enter_context(client)
   1562                     reader = cleanup.enter_context(client.makefile('rb'))
   1563                 elif cmd == b'QUIT\r\n':
   1564                     client.sendall(b'205 Bye!\r\n')
   1565                     break
   1566                 else:
   1567                     raise ValueError('Unexpected command {!r}'.format(cmd))
   1568 
   1569     @unittest.skipUnless(ssl, 'requires SSL support')
   1570     def test_starttls(self):
   1571         file = self.nntp.file
   1572         sock = self.nntp.sock
   1573         self.nntp.starttls()
   1574         # Check that the socket and internal pseudo-file really were
   1575         # changed.
   1576         self.assertNotEqual(file, self.nntp.file)
   1577         self.assertNotEqual(sock, self.nntp.sock)
   1578         # Check that the new socket really is an SSL one
   1579         self.assertIsInstance(self.nntp.sock, ssl.SSLSocket)
   1580         # Check that trying starttls when it's already active fails.
   1581         self.assertRaises(ValueError, self.nntp.starttls)
   1582 
   1583 
   1584 if __name__ == "__main__":
   1585     unittest.main()
   1586