Home | History | Annotate | Download | only in email
      1 # Copyright (C) 2001-2010 Python Software Foundation
      2 # Author: Barry Warsaw
      3 # Contact: email-sig (at] python.org
      4 
      5 """Miscellaneous utilities."""
      6 
      7 __all__ = [
      8     'collapse_rfc2231_value',
      9     'decode_params',
     10     'decode_rfc2231',
     11     'encode_rfc2231',
     12     'formataddr',
     13     'formatdate',
     14     'format_datetime',
     15     'getaddresses',
     16     'make_msgid',
     17     'mktime_tz',
     18     'parseaddr',
     19     'parsedate',
     20     'parsedate_tz',
     21     'parsedate_to_datetime',
     22     'unquote',
     23     ]
     24 
     25 import os
     26 import re
     27 import time
     28 import random
     29 import socket
     30 import datetime
     31 import urllib.parse
     32 
     33 from email._parseaddr import quote
     34 from email._parseaddr import AddressList as _AddressList
     35 from email._parseaddr import mktime_tz
     36 
     37 from email._parseaddr import parsedate, parsedate_tz, _parsedate_tz
     38 
     39 # Intrapackage imports
     40 from email.charset import Charset
     41 
     42 COMMASPACE = ', '
     43 EMPTYSTRING = ''
     44 UEMPTYSTRING = ''
     45 CRLF = '\r\n'
     46 TICK = "'"
     47 
     48 specialsre = re.compile(r'[][\\()<>@,:;".]')
     49 escapesre = re.compile(r'[\\"]')
     50 
     51 def _has_surrogates(s):
     52     """Return True if s contains surrogate-escaped binary data."""
     53     # This check is based on the fact that unless there are surrogates, utf8
     54     # (Python's default encoding) can encode any string.  This is the fastest
     55     # way to check for surrogates, see issue 11454 for timings.
     56     try:
     57         s.encode()
     58         return False
     59     except UnicodeEncodeError:
     60         return True
     61 
     62 # How to deal with a string containing bytes before handing it to the
     63 # application through the 'normal' interface.
     64 def _sanitize(string):
     65     # Turn any escaped bytes into unicode 'unknown' char.  If the escaped
     66     # bytes happen to be utf-8 they will instead get decoded, even if they
     67     # were invalid in the charset the source was supposed to be in.  This
     68     # seems like it is not a bad thing; a defect was still registered.
     69     original_bytes = string.encode('utf-8', 'surrogateescape')
     70     return original_bytes.decode('utf-8', 'replace')
     71 
     72 
     73 
     74 # Helpers
     75 
     76 def formataddr(pair, charset='utf-8'):
     77     """The inverse of parseaddr(), this takes a 2-tuple of the form
     78     (realname, email_address) and returns the string value suitable
     79     for an RFC 2822 From, To or Cc header.
     80 
     81     If the first element of pair is false, then the second element is
     82     returned unmodified.
     83 
     84     Optional charset if given is the character set that is used to encode
     85     realname in case realname is not ASCII safe.  Can be an instance of str or
     86     a Charset-like object which has a header_encode method.  Default is
     87     'utf-8'.
     88     """
     89     name, address = pair
     90     # The address MUST (per RFC) be ascii, so raise a UnicodeError if it isn't.
     91     address.encode('ascii')
     92     if name:
     93         try:
     94             name.encode('ascii')
     95         except UnicodeEncodeError:
     96             if isinstance(charset, str):
     97                 charset = Charset(charset)
     98             encoded_name = charset.header_encode(name)
     99             return "%s <%s>" % (encoded_name, address)
    100         else:
    101             quotes = ''
    102             if specialsre.search(name):
    103                 quotes = '"'
    104             name = escapesre.sub(r'\\\g<0>', name)
    105             return '%s%s%s <%s>' % (quotes, name, quotes, address)
    106     return address
    107 
    108 
    109 
    110 def getaddresses(fieldvalues):
    111     """Return a list of (REALNAME, EMAIL) for each fieldvalue."""
    112     all = COMMASPACE.join(fieldvalues)
    113     a = _AddressList(all)
    114     return a.addresslist
    115 
    116 
    117 
    118 ecre = re.compile(r'''
    119   =\?                   # literal =?
    120   (?P<charset>[^?]*?)   # non-greedy up to the next ? is the charset
    121   \?                    # literal ?
    122   (?P<encoding>[qb])    # either a "q" or a "b", case insensitive
    123   \?                    # literal ?
    124   (?P<atom>.*?)         # non-greedy up to the next ?= is the atom
    125   \?=                   # literal ?=
    126   ''', re.VERBOSE | re.IGNORECASE)
    127 
    128 
    129 def _format_timetuple_and_zone(timetuple, zone):
    130     return '%s, %02d %s %04d %02d:%02d:%02d %s' % (
    131         ['Mon', 'Tue', 'Wed', 'Thu', 'Fri', 'Sat', 'Sun'][timetuple[6]],
    132         timetuple[2],
    133         ['Jan', 'Feb', 'Mar', 'Apr', 'May', 'Jun',
    134          'Jul', 'Aug', 'Sep', 'Oct', 'Nov', 'Dec'][timetuple[1] - 1],
    135         timetuple[0], timetuple[3], timetuple[4], timetuple[5],
    136         zone)
    137 
    138 def formatdate(timeval=None, localtime=False, usegmt=False):
    139     """Returns a date string as specified by RFC 2822, e.g.:
    140 
    141     Fri, 09 Nov 2001 01:08:47 -0000
    142 
    143     Optional timeval if given is a floating point time value as accepted by
    144     gmtime() and localtime(), otherwise the current time is used.
    145 
    146     Optional localtime is a flag that when True, interprets timeval, and
    147     returns a date relative to the local timezone instead of UTC, properly
    148     taking daylight savings time into account.
    149 
    150     Optional argument usegmt means that the timezone is written out as
    151     an ascii string, not numeric one (so "GMT" instead of "+0000"). This
    152     is needed for HTTP, and is only used when localtime==False.
    153     """
    154     # Note: we cannot use strftime() because that honors the locale and RFC
    155     # 2822 requires that day and month names be the English abbreviations.
    156     if timeval is None:
    157         timeval = time.time()
    158     if localtime or usegmt:
    159         dt = datetime.datetime.fromtimestamp(timeval, datetime.timezone.utc)
    160     else:
    161         dt = datetime.datetime.utcfromtimestamp(timeval)
    162     if localtime:
    163         dt = dt.astimezone()
    164         usegmt = False
    165     return format_datetime(dt, usegmt)
    166 
    167 def format_datetime(dt, usegmt=False):
    168     """Turn a datetime into a date string as specified in RFC 2822.
    169 
    170     If usegmt is True, dt must be an aware datetime with an offset of zero.  In
    171     this case 'GMT' will be rendered instead of the normal +0000 required by
    172     RFC2822.  This is to support HTTP headers involving date stamps.
    173     """
    174     now = dt.timetuple()
    175     if usegmt:
    176         if dt.tzinfo is None or dt.tzinfo != datetime.timezone.utc:
    177             raise ValueError("usegmt option requires a UTC datetime")
    178         zone = 'GMT'
    179     elif dt.tzinfo is None:
    180         zone = '-0000'
    181     else:
    182         zone = dt.strftime("%z")
    183     return _format_timetuple_and_zone(now, zone)
    184 
    185 
    186 def make_msgid(idstring=None, domain=None):
    187     """Returns a string suitable for RFC 2822 compliant Message-ID, e.g:
    188 
    189     <142480216486.20800.16526388040877946887 (at] nightshade.la.mastaler.com>
    190 
    191     Optional idstring if given is a string used to strengthen the
    192     uniqueness of the message id.  Optional domain if given provides the
    193     portion of the message id after the '@'.  It defaults to the locally
    194     defined hostname.
    195     """
    196     timeval = int(time.time()*100)
    197     pid = os.getpid()
    198     randint = random.getrandbits(64)
    199     if idstring is None:
    200         idstring = ''
    201     else:
    202         idstring = '.' + idstring
    203     if domain is None:
    204         domain = socket.getfqdn()
    205     msgid = '<%d.%d.%d%s@%s>' % (timeval, pid, randint, idstring, domain)
    206     return msgid
    207 
    208 
    209 def parsedate_to_datetime(data):
    210     *dtuple, tz = _parsedate_tz(data)
    211     if tz is None:
    212         return datetime.datetime(*dtuple[:6])
    213     return datetime.datetime(*dtuple[:6],
    214             tzinfo=datetime.timezone(datetime.timedelta(seconds=tz)))
    215 
    216 
    217 def parseaddr(addr):
    218     addrs = _AddressList(addr).addresslist
    219     if not addrs:
    220         return '', ''
    221     return addrs[0]
    222 
    223 
    224 # rfc822.unquote() doesn't properly de-backslash-ify in Python pre-2.3.
    225 def unquote(str):
    226     """Remove quotes from a string."""
    227     if len(str) > 1:
    228         if str.startswith('"') and str.endswith('"'):
    229             return str[1:-1].replace('\\\\', '\\').replace('\\"', '"')
    230         if str.startswith('<') and str.endswith('>'):
    231             return str[1:-1]
    232     return str
    233 
    234 
    235 
    236 # RFC2231-related functions - parameter encoding and decoding
    237 def decode_rfc2231(s):
    238     """Decode string according to RFC 2231"""
    239     parts = s.split(TICK, 2)
    240     if len(parts) <= 2:
    241         return None, None, s
    242     return parts
    243 
    244 
    245 def encode_rfc2231(s, charset=None, language=None):
    246     """Encode string according to RFC 2231.
    247 
    248     If neither charset nor language is given, then s is returned as-is.  If
    249     charset is given but not language, the string is encoded using the empty
    250     string for language.
    251     """
    252     s = urllib.parse.quote(s, safe='', encoding=charset or 'ascii')
    253     if charset is None and language is None:
    254         return s
    255     if language is None:
    256         language = ''
    257     return "%s'%s'%s" % (charset, language, s)
    258 
    259 
    260 rfc2231_continuation = re.compile(r'^(?P<name>\w+)\*((?P<num>[0-9]+)\*?)?$',
    261     re.ASCII)
    262 
    263 def decode_params(params):
    264     """Decode parameters list according to RFC 2231.
    265 
    266     params is a sequence of 2-tuples containing (param name, string value).
    267     """
    268     # Copy params so we don't mess with the original
    269     params = params[:]
    270     new_params = []
    271     # Map parameter's name to a list of continuations.  The values are a
    272     # 3-tuple of the continuation number, the string value, and a flag
    273     # specifying whether a particular segment is %-encoded.
    274     rfc2231_params = {}
    275     name, value = params.pop(0)
    276     new_params.append((name, value))
    277     while params:
    278         name, value = params.pop(0)
    279         if name.endswith('*'):
    280             encoded = True
    281         else:
    282             encoded = False
    283         value = unquote(value)
    284         mo = rfc2231_continuation.match(name)
    285         if mo:
    286             name, num = mo.group('name', 'num')
    287             if num is not None:
    288                 num = int(num)
    289             rfc2231_params.setdefault(name, []).append((num, value, encoded))
    290         else:
    291             new_params.append((name, '"%s"' % quote(value)))
    292     if rfc2231_params:
    293         for name, continuations in rfc2231_params.items():
    294             value = []
    295             extended = False
    296             # Sort by number
    297             continuations.sort()
    298             # And now append all values in numerical order, converting
    299             # %-encodings for the encoded segments.  If any of the
    300             # continuation names ends in a *, then the entire string, after
    301             # decoding segments and concatenating, must have the charset and
    302             # language specifiers at the beginning of the string.
    303             for num, s, encoded in continuations:
    304                 if encoded:
    305                     # Decode as "latin-1", so the characters in s directly
    306                     # represent the percent-encoded octet values.
    307                     # collapse_rfc2231_value treats this as an octet sequence.
    308                     s = urllib.parse.unquote(s, encoding="latin-1")
    309                     extended = True
    310                 value.append(s)
    311             value = quote(EMPTYSTRING.join(value))
    312             if extended:
    313                 charset, language, value = decode_rfc2231(value)
    314                 new_params.append((name, (charset, language, '"%s"' % value)))
    315             else:
    316                 new_params.append((name, '"%s"' % value))
    317     return new_params
    318 
    319 def collapse_rfc2231_value(value, errors='replace',
    320                            fallback_charset='us-ascii'):
    321     if not isinstance(value, tuple) or len(value) != 3:
    322         return unquote(value)
    323     # While value comes to us as a unicode string, we need it to be a bytes
    324     # object.  We do not want bytes() normal utf-8 decoder, we want a straight
    325     # interpretation of the string as character bytes.
    326     charset, language, text = value
    327     if charset is None:
    328         # Issue 17369: if charset/lang is None, decode_rfc2231 couldn't parse
    329         # the value, so use the fallback_charset.
    330         charset = fallback_charset
    331     rawbytes = bytes(text, 'raw-unicode-escape')
    332     try:
    333         return str(rawbytes, charset, errors)
    334     except LookupError:
    335         # charset is not a known codec.
    336         return unquote(text)
    337 
    338 
    339 #
    340 # datetime doesn't provide a localtime function yet, so provide one.  Code
    341 # adapted from the patch in issue 9527.  This may not be perfect, but it is
    342 # better than not having it.
    343 #
    344 
    345 def localtime(dt=None, isdst=-1):
    346     """Return local time as an aware datetime object.
    347 
    348     If called without arguments, return current time.  Otherwise *dt*
    349     argument should be a datetime instance, and it is converted to the
    350     local time zone according to the system time zone database.  If *dt* is
    351     naive (that is, dt.tzinfo is None), it is assumed to be in local time.
    352     In this case, a positive or zero value for *isdst* causes localtime to
    353     presume initially that summer time (for example, Daylight Saving Time)
    354     is or is not (respectively) in effect for the specified time.  A
    355     negative value for *isdst* causes the localtime() function to attempt
    356     to divine whether summer time is in effect for the specified time.
    357 
    358     """
    359     if dt is None:
    360         return datetime.datetime.now(datetime.timezone.utc).astimezone()
    361     if dt.tzinfo is not None:
    362         return dt.astimezone()
    363     # We have a naive datetime.  Convert to a (localtime) timetuple and pass to
    364     # system mktime together with the isdst hint.  System mktime will return
    365     # seconds since epoch.
    366     tm = dt.timetuple()[:-1] + (isdst,)
    367     seconds = time.mktime(tm)
    368     localtm = time.localtime(seconds)
    369     try:
    370         delta = datetime.timedelta(seconds=localtm.tm_gmtoff)
    371         tz = datetime.timezone(delta, localtm.tm_zone)
    372     except AttributeError:
    373         # Compute UTC offset and compare with the value implied by tm_isdst.
    374         # If the values match, use the zone name implied by tm_isdst.
    375         delta = dt - datetime.datetime(*time.gmtime(seconds)[:6])
    376         dst = time.daylight and localtm.tm_isdst > 0
    377         gmtoff = -(time.altzone if dst else time.timezone)
    378         if delta == datetime.timedelta(seconds=gmtoff):
    379             tz = datetime.timezone(delta, time.tzname[dst])
    380         else:
    381             tz = datetime.timezone(delta)
    382     return dt.replace(tzinfo=tz)
    383