1 # Copyright (C) 2001-2010 Python Software Foundation 2 # Author: Barry Warsaw 3 # Contact: email-sig (at] python.org 4 5 """Miscellaneous utilities.""" 6 7 __all__ = [ 8 'collapse_rfc2231_value', 9 'decode_params', 10 'decode_rfc2231', 11 'encode_rfc2231', 12 'formataddr', 13 'formatdate', 14 'format_datetime', 15 'getaddresses', 16 'make_msgid', 17 'mktime_tz', 18 'parseaddr', 19 'parsedate', 20 'parsedate_tz', 21 'parsedate_to_datetime', 22 'unquote', 23 ] 24 25 import os 26 import re 27 import time 28 import random 29 import socket 30 import datetime 31 import urllib.parse 32 33 from email._parseaddr import quote 34 from email._parseaddr import AddressList as _AddressList 35 from email._parseaddr import mktime_tz 36 37 from email._parseaddr import parsedate, parsedate_tz, _parsedate_tz 38 39 # Intrapackage imports 40 from email.charset import Charset 41 42 COMMASPACE = ', ' 43 EMPTYSTRING = '' 44 UEMPTYSTRING = '' 45 CRLF = '\r\n' 46 TICK = "'" 47 48 specialsre = re.compile(r'[][\\()<>@,:;".]') 49 escapesre = re.compile(r'[\\"]') 50 51 def _has_surrogates(s): 52 """Return True if s contains surrogate-escaped binary data.""" 53 # This check is based on the fact that unless there are surrogates, utf8 54 # (Python's default encoding) can encode any string. This is the fastest 55 # way to check for surrogates, see issue 11454 for timings. 56 try: 57 s.encode() 58 return False 59 except UnicodeEncodeError: 60 return True 61 62 # How to deal with a string containing bytes before handing it to the 63 # application through the 'normal' interface. 64 def _sanitize(string): 65 # Turn any escaped bytes into unicode 'unknown' char. If the escaped 66 # bytes happen to be utf-8 they will instead get decoded, even if they 67 # were invalid in the charset the source was supposed to be in. This 68 # seems like it is not a bad thing; a defect was still registered. 69 original_bytes = string.encode('utf-8', 'surrogateescape') 70 return original_bytes.decode('utf-8', 'replace') 71 72 73 74 # Helpers 75 76 def formataddr(pair, charset='utf-8'): 77 """The inverse of parseaddr(), this takes a 2-tuple of the form 78 (realname, email_address) and returns the string value suitable 79 for an RFC 2822 From, To or Cc header. 80 81 If the first element of pair is false, then the second element is 82 returned unmodified. 83 84 Optional charset if given is the character set that is used to encode 85 realname in case realname is not ASCII safe. Can be an instance of str or 86 a Charset-like object which has a header_encode method. Default is 87 'utf-8'. 88 """ 89 name, address = pair 90 # The address MUST (per RFC) be ascii, so raise a UnicodeError if it isn't. 91 address.encode('ascii') 92 if name: 93 try: 94 name.encode('ascii') 95 except UnicodeEncodeError: 96 if isinstance(charset, str): 97 charset = Charset(charset) 98 encoded_name = charset.header_encode(name) 99 return "%s <%s>" % (encoded_name, address) 100 else: 101 quotes = '' 102 if specialsre.search(name): 103 quotes = '"' 104 name = escapesre.sub(r'\\\g<0>', name) 105 return '%s%s%s <%s>' % (quotes, name, quotes, address) 106 return address 107 108 109 110 def getaddresses(fieldvalues): 111 """Return a list of (REALNAME, EMAIL) for each fieldvalue.""" 112 all = COMMASPACE.join(fieldvalues) 113 a = _AddressList(all) 114 return a.addresslist 115 116 117 118 ecre = re.compile(r''' 119 =\? # literal =? 120 (?P<charset>[^?]*?) # non-greedy up to the next ? is the charset 121 \? # literal ? 122 (?P<encoding>[qb]) # either a "q" or a "b", case insensitive 123 \? # literal ? 124 (?P<atom>.*?) # non-greedy up to the next ?= is the atom 125 \?= # literal ?= 126 ''', re.VERBOSE | re.IGNORECASE) 127 128 129 def _format_timetuple_and_zone(timetuple, zone): 130 return '%s, %02d %s %04d %02d:%02d:%02d %s' % ( 131 ['Mon', 'Tue', 'Wed', 'Thu', 'Fri', 'Sat', 'Sun'][timetuple[6]], 132 timetuple[2], 133 ['Jan', 'Feb', 'Mar', 'Apr', 'May', 'Jun', 134 'Jul', 'Aug', 'Sep', 'Oct', 'Nov', 'Dec'][timetuple[1] - 1], 135 timetuple[0], timetuple[3], timetuple[4], timetuple[5], 136 zone) 137 138 def formatdate(timeval=None, localtime=False, usegmt=False): 139 """Returns a date string as specified by RFC 2822, e.g.: 140 141 Fri, 09 Nov 2001 01:08:47 -0000 142 143 Optional timeval if given is a floating point time value as accepted by 144 gmtime() and localtime(), otherwise the current time is used. 145 146 Optional localtime is a flag that when True, interprets timeval, and 147 returns a date relative to the local timezone instead of UTC, properly 148 taking daylight savings time into account. 149 150 Optional argument usegmt means that the timezone is written out as 151 an ascii string, not numeric one (so "GMT" instead of "+0000"). This 152 is needed for HTTP, and is only used when localtime==False. 153 """ 154 # Note: we cannot use strftime() because that honors the locale and RFC 155 # 2822 requires that day and month names be the English abbreviations. 156 if timeval is None: 157 timeval = time.time() 158 if localtime or usegmt: 159 dt = datetime.datetime.fromtimestamp(timeval, datetime.timezone.utc) 160 else: 161 dt = datetime.datetime.utcfromtimestamp(timeval) 162 if localtime: 163 dt = dt.astimezone() 164 usegmt = False 165 return format_datetime(dt, usegmt) 166 167 def format_datetime(dt, usegmt=False): 168 """Turn a datetime into a date string as specified in RFC 2822. 169 170 If usegmt is True, dt must be an aware datetime with an offset of zero. In 171 this case 'GMT' will be rendered instead of the normal +0000 required by 172 RFC2822. This is to support HTTP headers involving date stamps. 173 """ 174 now = dt.timetuple() 175 if usegmt: 176 if dt.tzinfo is None or dt.tzinfo != datetime.timezone.utc: 177 raise ValueError("usegmt option requires a UTC datetime") 178 zone = 'GMT' 179 elif dt.tzinfo is None: 180 zone = '-0000' 181 else: 182 zone = dt.strftime("%z") 183 return _format_timetuple_and_zone(now, zone) 184 185 186 def make_msgid(idstring=None, domain=None): 187 """Returns a string suitable for RFC 2822 compliant Message-ID, e.g: 188 189 <142480216486.20800.16526388040877946887 (at] nightshade.la.mastaler.com> 190 191 Optional idstring if given is a string used to strengthen the 192 uniqueness of the message id. Optional domain if given provides the 193 portion of the message id after the '@'. It defaults to the locally 194 defined hostname. 195 """ 196 timeval = int(time.time()*100) 197 pid = os.getpid() 198 randint = random.getrandbits(64) 199 if idstring is None: 200 idstring = '' 201 else: 202 idstring = '.' + idstring 203 if domain is None: 204 domain = socket.getfqdn() 205 msgid = '<%d.%d.%d%s@%s>' % (timeval, pid, randint, idstring, domain) 206 return msgid 207 208 209 def parsedate_to_datetime(data): 210 *dtuple, tz = _parsedate_tz(data) 211 if tz is None: 212 return datetime.datetime(*dtuple[:6]) 213 return datetime.datetime(*dtuple[:6], 214 tzinfo=datetime.timezone(datetime.timedelta(seconds=tz))) 215 216 217 def parseaddr(addr): 218 addrs = _AddressList(addr).addresslist 219 if not addrs: 220 return '', '' 221 return addrs[0] 222 223 224 # rfc822.unquote() doesn't properly de-backslash-ify in Python pre-2.3. 225 def unquote(str): 226 """Remove quotes from a string.""" 227 if len(str) > 1: 228 if str.startswith('"') and str.endswith('"'): 229 return str[1:-1].replace('\\\\', '\\').replace('\\"', '"') 230 if str.startswith('<') and str.endswith('>'): 231 return str[1:-1] 232 return str 233 234 235 236 # RFC2231-related functions - parameter encoding and decoding 237 def decode_rfc2231(s): 238 """Decode string according to RFC 2231""" 239 parts = s.split(TICK, 2) 240 if len(parts) <= 2: 241 return None, None, s 242 return parts 243 244 245 def encode_rfc2231(s, charset=None, language=None): 246 """Encode string according to RFC 2231. 247 248 If neither charset nor language is given, then s is returned as-is. If 249 charset is given but not language, the string is encoded using the empty 250 string for language. 251 """ 252 s = urllib.parse.quote(s, safe='', encoding=charset or 'ascii') 253 if charset is None and language is None: 254 return s 255 if language is None: 256 language = '' 257 return "%s'%s'%s" % (charset, language, s) 258 259 260 rfc2231_continuation = re.compile(r'^(?P<name>\w+)\*((?P<num>[0-9]+)\*?)?$', 261 re.ASCII) 262 263 def decode_params(params): 264 """Decode parameters list according to RFC 2231. 265 266 params is a sequence of 2-tuples containing (param name, string value). 267 """ 268 # Copy params so we don't mess with the original 269 params = params[:] 270 new_params = [] 271 # Map parameter's name to a list of continuations. The values are a 272 # 3-tuple of the continuation number, the string value, and a flag 273 # specifying whether a particular segment is %-encoded. 274 rfc2231_params = {} 275 name, value = params.pop(0) 276 new_params.append((name, value)) 277 while params: 278 name, value = params.pop(0) 279 if name.endswith('*'): 280 encoded = True 281 else: 282 encoded = False 283 value = unquote(value) 284 mo = rfc2231_continuation.match(name) 285 if mo: 286 name, num = mo.group('name', 'num') 287 if num is not None: 288 num = int(num) 289 rfc2231_params.setdefault(name, []).append((num, value, encoded)) 290 else: 291 new_params.append((name, '"%s"' % quote(value))) 292 if rfc2231_params: 293 for name, continuations in rfc2231_params.items(): 294 value = [] 295 extended = False 296 # Sort by number 297 continuations.sort() 298 # And now append all values in numerical order, converting 299 # %-encodings for the encoded segments. If any of the 300 # continuation names ends in a *, then the entire string, after 301 # decoding segments and concatenating, must have the charset and 302 # language specifiers at the beginning of the string. 303 for num, s, encoded in continuations: 304 if encoded: 305 # Decode as "latin-1", so the characters in s directly 306 # represent the percent-encoded octet values. 307 # collapse_rfc2231_value treats this as an octet sequence. 308 s = urllib.parse.unquote(s, encoding="latin-1") 309 extended = True 310 value.append(s) 311 value = quote(EMPTYSTRING.join(value)) 312 if extended: 313 charset, language, value = decode_rfc2231(value) 314 new_params.append((name, (charset, language, '"%s"' % value))) 315 else: 316 new_params.append((name, '"%s"' % value)) 317 return new_params 318 319 def collapse_rfc2231_value(value, errors='replace', 320 fallback_charset='us-ascii'): 321 if not isinstance(value, tuple) or len(value) != 3: 322 return unquote(value) 323 # While value comes to us as a unicode string, we need it to be a bytes 324 # object. We do not want bytes() normal utf-8 decoder, we want a straight 325 # interpretation of the string as character bytes. 326 charset, language, text = value 327 if charset is None: 328 # Issue 17369: if charset/lang is None, decode_rfc2231 couldn't parse 329 # the value, so use the fallback_charset. 330 charset = fallback_charset 331 rawbytes = bytes(text, 'raw-unicode-escape') 332 try: 333 return str(rawbytes, charset, errors) 334 except LookupError: 335 # charset is not a known codec. 336 return unquote(text) 337 338 339 # 340 # datetime doesn't provide a localtime function yet, so provide one. Code 341 # adapted from the patch in issue 9527. This may not be perfect, but it is 342 # better than not having it. 343 # 344 345 def localtime(dt=None, isdst=-1): 346 """Return local time as an aware datetime object. 347 348 If called without arguments, return current time. Otherwise *dt* 349 argument should be a datetime instance, and it is converted to the 350 local time zone according to the system time zone database. If *dt* is 351 naive (that is, dt.tzinfo is None), it is assumed to be in local time. 352 In this case, a positive or zero value for *isdst* causes localtime to 353 presume initially that summer time (for example, Daylight Saving Time) 354 is or is not (respectively) in effect for the specified time. A 355 negative value for *isdst* causes the localtime() function to attempt 356 to divine whether summer time is in effect for the specified time. 357 358 """ 359 if dt is None: 360 return datetime.datetime.now(datetime.timezone.utc).astimezone() 361 if dt.tzinfo is not None: 362 return dt.astimezone() 363 # We have a naive datetime. Convert to a (localtime) timetuple and pass to 364 # system mktime together with the isdst hint. System mktime will return 365 # seconds since epoch. 366 tm = dt.timetuple()[:-1] + (isdst,) 367 seconds = time.mktime(tm) 368 localtm = time.localtime(seconds) 369 try: 370 delta = datetime.timedelta(seconds=localtm.tm_gmtoff) 371 tz = datetime.timezone(delta, localtm.tm_zone) 372 except AttributeError: 373 # Compute UTC offset and compare with the value implied by tm_isdst. 374 # If the values match, use the zone name implied by tm_isdst. 375 delta = dt - datetime.datetime(*time.gmtime(seconds)[:6]) 376 dst = time.daylight and localtm.tm_isdst > 0 377 gmtoff = -(time.altzone if dst else time.timezone) 378 if delta == datetime.timedelta(seconds=gmtoff): 379 tz = datetime.timezone(delta, time.tzname[dst]) 380 else: 381 tz = datetime.timezone(delta) 382 return dt.replace(tzinfo=tz) 383