Home | History | Annotate | Download | only in email
      1 # Copyright (C) 2002-2006 Python Software Foundation
      2 # Author: Ben Gertzfield, Barry Warsaw
      3 # Contact: email-sig (at] python.org
      4 
      5 """Header encoding and decoding functionality."""
      6 
      7 __all__ = [
      8     'Header',
      9     'decode_header',
     10     'make_header',
     11     ]
     12 
     13 import re
     14 import binascii
     15 
     16 import email.quoprimime
     17 import email.base64mime
     18 
     19 from email.errors import HeaderParseError
     20 from email.charset import Charset
     21 
     22 NL = '\n'
     23 SPACE = ' '
     24 USPACE = u' '
     25 SPACE8 = ' ' * 8
     26 UEMPTYSTRING = u''
     27 
     28 MAXLINELEN = 76
     29 
     30 USASCII = Charset('us-ascii')
     31 UTF8 = Charset('utf-8')
     32 
     33 # Match encoded-word strings in the form =?charset?q?Hello_World?=
     34 ecre = re.compile(r'''
     35   =\?                   # literal =?
     36   (?P<charset>[^?]*?)   # non-greedy up to the next ? is the charset
     37   \?                    # literal ?
     38   (?P<encoding>[qb])    # either a "q" or a "b", case insensitive
     39   \?                    # literal ?
     40   (?P<encoded>.*?)      # non-greedy up to the next ?= is the encoded string
     41   \?=                   # literal ?=
     42   (?=[ \t]|$)           # whitespace or the end of the string
     43   ''', re.VERBOSE | re.IGNORECASE | re.MULTILINE)
     44 
     45 # Field name regexp, including trailing colon, but not separating whitespace,
     46 # according to RFC 2822.  Character range is from tilde to exclamation mark.
     47 # For use with .match()
     48 fcre = re.compile(r'[\041-\176]+:$')
     49 
     50 # Find a header embedded in a putative header value.  Used to check for
     51 # header injection attack.
     52 _embeded_header = re.compile(r'\n[^ \t]+:')
     53 
     54 
     55 
     57 # Helpers
     58 _max_append = email.quoprimime._max_append
     59 
     60 
     61 
     63 def decode_header(header):
     64     """Decode a message header value without converting charset.
     65 
     66     Returns a list of (decoded_string, charset) pairs containing each of the
     67     decoded parts of the header.  Charset is None for non-encoded parts of the
     68     header, otherwise a lower-case string containing the name of the character
     69     set specified in the encoded string.
     70 
     71     An email.errors.HeaderParseError may be raised when certain decoding error
     72     occurs (e.g. a base64 decoding exception).
     73     """
     74     # If no encoding, just return the header
     75     header = str(header)
     76     if not ecre.search(header):
     77         return [(header, None)]
     78     decoded = []
     79     dec = ''
     80     for line in header.splitlines():
     81         # This line might not have an encoding in it
     82         if not ecre.search(line):
     83             decoded.append((line, None))
     84             continue
     85         parts = ecre.split(line)
     86         while parts:
     87             unenc = parts.pop(0).strip()
     88             if unenc:
     89                 # Should we continue a long line?
     90                 if decoded and decoded[-1][1] is None:
     91                     decoded[-1] = (decoded[-1][0] + SPACE + unenc, None)
     92                 else:
     93                     decoded.append((unenc, None))
     94             if parts:
     95                 charset, encoding = [s.lower() for s in parts[0:2]]
     96                 encoded = parts[2]
     97                 dec = None
     98                 if encoding == 'q':
     99                     dec = email.quoprimime.header_decode(encoded)
    100                 elif encoding == 'b':
    101                     paderr = len(encoded) % 4   # Postel's law: add missing padding
    102                     if paderr:
    103                         encoded += '==='[:4 - paderr]
    104                     try:
    105                         dec = email.base64mime.decode(encoded)
    106                     except binascii.Error:
    107                         # Turn this into a higher level exception.  BAW: Right
    108                         # now we throw the lower level exception away but
    109                         # when/if we get exception chaining, we'll preserve it.
    110                         raise HeaderParseError
    111                 if dec is None:
    112                     dec = encoded
    113 
    114                 if decoded and decoded[-1][1] == charset:
    115                     decoded[-1] = (decoded[-1][0] + dec, decoded[-1][1])
    116                 else:
    117                     decoded.append((dec, charset))
    118             del parts[0:3]
    119     return decoded
    120 
    121 
    122 
    124 def make_header(decoded_seq, maxlinelen=None, header_name=None,
    125                 continuation_ws=' '):
    126     """Create a Header from a sequence of pairs as returned by decode_header()
    127 
    128     decode_header() takes a header value string and returns a sequence of
    129     pairs of the format (decoded_string, charset) where charset is the string
    130     name of the character set.
    131 
    132     This function takes one of those sequence of pairs and returns a Header
    133     instance.  Optional maxlinelen, header_name, and continuation_ws are as in
    134     the Header constructor.
    135     """
    136     h = Header(maxlinelen=maxlinelen, header_name=header_name,
    137                continuation_ws=continuation_ws)
    138     for s, charset in decoded_seq:
    139         # None means us-ascii but we can simply pass it on to h.append()
    140         if charset is not None and not isinstance(charset, Charset):
    141             charset = Charset(charset)
    142         h.append(s, charset)
    143     return h
    144 
    145 
    146 
    148 class Header:
    149     def __init__(self, s=None, charset=None,
    150                  maxlinelen=None, header_name=None,
    151                  continuation_ws=' ', errors='strict'):
    152         """Create a MIME-compliant header that can contain many character sets.
    153 
    154         Optional s is the initial header value.  If None, the initial header
    155         value is not set.  You can later append to the header with .append()
    156         method calls.  s may be a byte string or a Unicode string, but see the
    157         .append() documentation for semantics.
    158 
    159         Optional charset serves two purposes: it has the same meaning as the
    160         charset argument to the .append() method.  It also sets the default
    161         character set for all subsequent .append() calls that omit the charset
    162         argument.  If charset is not provided in the constructor, the us-ascii
    163         charset is used both as s's initial charset and as the default for
    164         subsequent .append() calls.
    165 
    166         The maximum line length can be specified explicit via maxlinelen.  For
    167         splitting the first line to a shorter value (to account for the field
    168         header which isn't included in s, e.g. `Subject') pass in the name of
    169         the field in header_name.  The default maxlinelen is 76.
    170 
    171         continuation_ws must be RFC 2822 compliant folding whitespace (usually
    172         either a space or a hard tab) which will be prepended to continuation
    173         lines.
    174 
    175         errors is passed through to the .append() call.
    176         """
    177         if charset is None:
    178             charset = USASCII
    179         if not isinstance(charset, Charset):
    180             charset = Charset(charset)
    181         self._charset = charset
    182         self._continuation_ws = continuation_ws
    183         cws_expanded_len = len(continuation_ws.replace('\t', SPACE8))
    184         # BAW: I believe `chunks' and `maxlinelen' should be non-public.
    185         self._chunks = []
    186         if s is not None:
    187             self.append(s, charset, errors)
    188         if maxlinelen is None:
    189             maxlinelen = MAXLINELEN
    190         if header_name is None:
    191             # We don't know anything about the field header so the first line
    192             # is the same length as subsequent lines.
    193             self._firstlinelen = maxlinelen
    194         else:
    195             # The first line should be shorter to take into account the field
    196             # header.  Also subtract off 2 extra for the colon and space.
    197             self._firstlinelen = maxlinelen - len(header_name) - 2
    198         # Second and subsequent lines should subtract off the length in
    199         # columns of the continuation whitespace prefix.
    200         self._maxlinelen = maxlinelen - cws_expanded_len
    201 
    202     def __str__(self):
    203         """A synonym for self.encode()."""
    204         return self.encode()
    205 
    206     def __unicode__(self):
    207         """Helper for the built-in unicode function."""
    208         uchunks = []
    209         lastcs = None
    210         for s, charset in self._chunks:
    211             # We must preserve spaces between encoded and non-encoded word
    212             # boundaries, which means for us we need to add a space when we go
    213             # from a charset to None/us-ascii, or from None/us-ascii to a
    214             # charset.  Only do this for the second and subsequent chunks.
    215             nextcs = charset
    216             if uchunks:
    217                 if lastcs not in (None, 'us-ascii'):
    218                     if nextcs in (None, 'us-ascii'):
    219                         uchunks.append(USPACE)
    220                         nextcs = None
    221                 elif nextcs not in (None, 'us-ascii'):
    222                     uchunks.append(USPACE)
    223             lastcs = nextcs
    224             uchunks.append(unicode(s, str(charset)))
    225         return UEMPTYSTRING.join(uchunks)
    226 
    227     # Rich comparison operators for equality only.  BAW: does it make sense to
    228     # have or explicitly disable <, <=, >, >= operators?
    229     def __eq__(self, other):
    230         # other may be a Header or a string.  Both are fine so coerce
    231         # ourselves to a string, swap the args and do another comparison.
    232         return other == self.encode()
    233 
    234     def __ne__(self, other):
    235         return not self == other
    236 
    237     def append(self, s, charset=None, errors='strict'):
    238         """Append a string to the MIME header.
    239 
    240         Optional charset, if given, should be a Charset instance or the name
    241         of a character set (which will be converted to a Charset instance).  A
    242         value of None (the default) means that the charset given in the
    243         constructor is used.
    244 
    245         s may be a byte string or a Unicode string.  If it is a byte string
    246         (i.e. isinstance(s, str) is true), then charset is the encoding of
    247         that byte string, and a UnicodeError will be raised if the string
    248         cannot be decoded with that charset.  If s is a Unicode string, then
    249         charset is a hint specifying the character set of the characters in
    250         the string.  In this case, when producing an RFC 2822 compliant header
    251         using RFC 2047 rules, the Unicode string will be encoded using the
    252         following charsets in order: us-ascii, the charset hint, utf-8.  The
    253         first character set not to provoke a UnicodeError is used.
    254 
    255         Optional `errors' is passed as the third argument to any unicode() or
    256         ustr.encode() call.
    257         """
    258         if charset is None:
    259             charset = self._charset
    260         elif not isinstance(charset, Charset):
    261             charset = Charset(charset)
    262         # If the charset is our faux 8bit charset, leave the string unchanged
    263         if charset != '8bit':
    264             # We need to test that the string can be converted to unicode and
    265             # back to a byte string, given the input and output codecs of the
    266             # charset.
    267             if isinstance(s, str):
    268                 # Possibly raise UnicodeError if the byte string can't be
    269                 # converted to a unicode with the input codec of the charset.
    270                 incodec = charset.input_codec or 'us-ascii'
    271                 ustr = unicode(s, incodec, errors)
    272                 # Now make sure that the unicode could be converted back to a
    273                 # byte string with the output codec, which may be different
    274                 # than the iput coded.  Still, use the original byte string.
    275                 outcodec = charset.output_codec or 'us-ascii'
    276                 ustr.encode(outcodec, errors)
    277             elif isinstance(s, unicode):
    278                 # Now we have to be sure the unicode string can be converted
    279                 # to a byte string with a reasonable output codec.  We want to
    280                 # use the byte string in the chunk.
    281                 for charset in USASCII, charset, UTF8:
    282                     try:
    283                         outcodec = charset.output_codec or 'us-ascii'
    284                         s = s.encode(outcodec, errors)
    285                         break
    286                     except UnicodeError:
    287                         pass
    288                 else:
    289                     assert False, 'utf-8 conversion failed'
    290         self._chunks.append((s, charset))
    291 
    292     def _split(self, s, charset, maxlinelen, splitchars):
    293         # Split up a header safely for use with encode_chunks.
    294         splittable = charset.to_splittable(s)
    295         encoded = charset.from_splittable(splittable, True)
    296         elen = charset.encoded_header_len(encoded)
    297         # If the line's encoded length first, just return it
    298         if elen <= maxlinelen:
    299             return [(encoded, charset)]
    300         # If we have undetermined raw 8bit characters sitting in a byte
    301         # string, we really don't know what the right thing to do is.  We
    302         # can't really split it because it might be multibyte data which we
    303         # could break if we split it between pairs.  The least harm seems to
    304         # be to not split the header at all, but that means they could go out
    305         # longer than maxlinelen.
    306         if charset == '8bit':
    307             return [(s, charset)]
    308         # BAW: I'm not sure what the right test here is.  What we're trying to
    309         # do is be faithful to RFC 2822's recommendation that ($2.2.3):
    310         #
    311         # "Note: Though structured field bodies are defined in such a way that
    312         #  folding can take place between many of the lexical tokens (and even
    313         #  within some of the lexical tokens), folding SHOULD be limited to
    314         #  placing the CRLF at higher-level syntactic breaks."
    315         #
    316         # For now, I can only imagine doing this when the charset is us-ascii,
    317         # although it's possible that other charsets may also benefit from the
    318         # higher-level syntactic breaks.
    319         elif charset == 'us-ascii':
    320             return self._split_ascii(s, charset, maxlinelen, splitchars)
    321         # BAW: should we use encoded?
    322         elif elen == len(s):
    323             # We can split on _maxlinelen boundaries because we know that the
    324             # encoding won't change the size of the string
    325             splitpnt = maxlinelen
    326             first = charset.from_splittable(splittable[:splitpnt], False)
    327             last = charset.from_splittable(splittable[splitpnt:], False)
    328         else:
    329             # Binary search for split point
    330             first, last = _binsplit(splittable, charset, maxlinelen)
    331         # first is of the proper length so just wrap it in the appropriate
    332         # chrome.  last must be recursively split.
    333         fsplittable = charset.to_splittable(first)
    334         fencoded = charset.from_splittable(fsplittable, True)
    335         chunk = [(fencoded, charset)]
    336         return chunk + self._split(last, charset, self._maxlinelen, splitchars)
    337 
    338     def _split_ascii(self, s, charset, firstlen, splitchars):
    339         chunks = _split_ascii(s, firstlen, self._maxlinelen,
    340                               self._continuation_ws, splitchars)
    341         return zip(chunks, [charset]*len(chunks))
    342 
    343     def _encode_chunks(self, newchunks, maxlinelen):
    344         # MIME-encode a header with many different charsets and/or encodings.
    345         #
    346         # Given a list of pairs (string, charset), return a MIME-encoded
    347         # string suitable for use in a header field.  Each pair may have
    348         # different charsets and/or encodings, and the resulting header will
    349         # accurately reflect each setting.
    350         #
    351         # Each encoding can be email.utils.QP (quoted-printable, for
    352         # ASCII-like character sets like iso-8859-1), email.utils.BASE64
    353         # (Base64, for non-ASCII like character sets like KOI8-R and
    354         # iso-2022-jp), or None (no encoding).
    355         #
    356         # Each pair will be represented on a separate line; the resulting
    357         # string will be in the format:
    358         #
    359         # =?charset1?q?Mar=EDa_Gonz=E1lez_Alonso?=\n
    360         #  =?charset2?b?SvxyZ2VuIEL2aW5n?="
    361         chunks = []
    362         for header, charset in newchunks:
    363             if not header:
    364                 continue
    365             if charset is None or charset.header_encoding is None:
    366                 s = header
    367             else:
    368                 s = charset.header_encode(header)
    369             # Don't add more folding whitespace than necessary
    370             if chunks and chunks[-1].endswith(' '):
    371                 extra = ''
    372             else:
    373                 extra = ' '
    374             _max_append(chunks, s, maxlinelen, extra)
    375         joiner = NL + self._continuation_ws
    376         return joiner.join(chunks)
    377 
    378     def encode(self, splitchars=';, '):
    379         """Encode a message header into an RFC-compliant format.
    380 
    381         There are many issues involved in converting a given string for use in
    382         an email header.  Only certain character sets are readable in most
    383         email clients, and as header strings can only contain a subset of
    384         7-bit ASCII, care must be taken to properly convert and encode (with
    385         Base64 or quoted-printable) header strings.  In addition, there is a
    386         75-character length limit on any given encoded header field, so
    387         line-wrapping must be performed, even with double-byte character sets.
    388 
    389         This method will do its best to convert the string to the correct
    390         character set used in email, and encode and line wrap it safely with
    391         the appropriate scheme for that character set.
    392 
    393         If the given charset is not known or an error occurs during
    394         conversion, this function will return the header untouched.
    395 
    396         Optional splitchars is a string containing characters to split long
    397         ASCII lines on, in rough support of RFC 2822's `highest level
    398         syntactic breaks'.  This doesn't affect RFC 2047 encoded lines.
    399         """
    400         newchunks = []
    401         maxlinelen = self._firstlinelen
    402         lastlen = 0
    403         for s, charset in self._chunks:
    404             # The first bit of the next chunk should be just long enough to
    405             # fill the next line.  Don't forget the space separating the
    406             # encoded words.
    407             targetlen = maxlinelen - lastlen - 1
    408             if targetlen < charset.encoded_header_len(''):
    409                 # Stick it on the next line
    410                 targetlen = maxlinelen
    411             newchunks += self._split(s, charset, targetlen, splitchars)
    412             lastchunk, lastcharset = newchunks[-1]
    413             lastlen = lastcharset.encoded_header_len(lastchunk)
    414         value = self._encode_chunks(newchunks, maxlinelen)
    415         if _embeded_header.search(value):
    416             raise HeaderParseError("header value appears to contain "
    417                 "an embedded header: {!r}".format(value))
    418         return value
    419 
    420 
    421 
    423 def _split_ascii(s, firstlen, restlen, continuation_ws, splitchars):
    424     lines = []
    425     maxlen = firstlen
    426     for line in s.splitlines():
    427         # Ignore any leading whitespace (i.e. continuation whitespace) already
    428         # on the line, since we'll be adding our own.
    429         line = line.lstrip()
    430         if len(line) < maxlen:
    431             lines.append(line)
    432             maxlen = restlen
    433             continue
    434         # Attempt to split the line at the highest-level syntactic break
    435         # possible.  Note that we don't have a lot of smarts about field
    436         # syntax; we just try to break on semi-colons, then commas, then
    437         # whitespace.
    438         for ch in splitchars:
    439             if ch in line:
    440                 break
    441         else:
    442             # There's nothing useful to split the line on, not even spaces, so
    443             # just append this line unchanged
    444             lines.append(line)
    445             maxlen = restlen
    446             continue
    447         # Now split the line on the character plus trailing whitespace
    448         cre = re.compile(r'%s\s*' % ch)
    449         if ch in ';,':
    450             eol = ch
    451         else:
    452             eol = ''
    453         joiner = eol + ' '
    454         joinlen = len(joiner)
    455         wslen = len(continuation_ws.replace('\t', SPACE8))
    456         this = []
    457         linelen = 0
    458         for part in cre.split(line):
    459             curlen = linelen + max(0, len(this)-1) * joinlen
    460             partlen = len(part)
    461             onfirstline = not lines
    462             # We don't want to split after the field name, if we're on the
    463             # first line and the field name is present in the header string.
    464             if ch == ' ' and onfirstline and \
    465                    len(this) == 1 and fcre.match(this[0]):
    466                 this.append(part)
    467                 linelen += partlen
    468             elif curlen + partlen > maxlen:
    469                 if this:
    470                     lines.append(joiner.join(this) + eol)
    471                 # If this part is longer than maxlen and we aren't already
    472                 # splitting on whitespace, try to recursively split this line
    473                 # on whitespace.
    474                 if partlen > maxlen and ch != ' ':
    475                     subl = _split_ascii(part, maxlen, restlen,
    476                                         continuation_ws, ' ')
    477                     lines.extend(subl[:-1])
    478                     this = [subl[-1]]
    479                 else:
    480                     this = [part]
    481                 linelen = wslen + len(this[-1])
    482                 maxlen = restlen
    483             else:
    484                 this.append(part)
    485                 linelen += partlen
    486         # Put any left over parts on a line by themselves
    487         if this:
    488             lines.append(joiner.join(this))
    489     return lines
    490 
    491 
    492 
    494 def _binsplit(splittable, charset, maxlinelen):
    495     i = 0
    496     j = len(splittable)
    497     while i < j:
    498         # Invariants:
    499         # 1. splittable[:k] fits for all k <= i (note that we *assume*,
    500         #    at the start, that splittable[:0] fits).
    501         # 2. splittable[:k] does not fit for any k > j (at the start,
    502         #    this means we shouldn't look at any k > len(splittable)).
    503         # 3. We don't know about splittable[:k] for k in i+1..j.
    504         # 4. We want to set i to the largest k that fits, with i <= k <= j.
    505         #
    506         m = (i+j+1) >> 1  # ceiling((i+j)/2); i < m <= j
    507         chunk = charset.from_splittable(splittable[:m], True)
    508         chunklen = charset.encoded_header_len(chunk)
    509         if chunklen <= maxlinelen:
    510             # m is acceptable, so is a new lower bound.
    511             i = m
    512         else:
    513             # m is not acceptable, so final i must be < m.
    514             j = m - 1
    515     # i == j.  Invariant #1 implies that splittable[:i] fits, and
    516     # invariant #2 implies that splittable[:i+1] does not fit, so i
    517     # is what we're looking for.
    518     first = charset.from_splittable(splittable[:i], False)
    519     last  = charset.from_splittable(splittable[i:], False)
    520     return first, last
    521