1 """Various tools used by MIME-reading or MIME-writing programs.""" 2 3 4 import os 5 import sys 6 import tempfile 7 from warnings import filterwarnings, catch_warnings 8 with catch_warnings(): 9 if sys.py3kwarning: 10 filterwarnings("ignore", ".*rfc822 has been removed", DeprecationWarning) 11 import rfc822 12 13 from warnings import warnpy3k 14 warnpy3k("in 3.x, mimetools has been removed in favor of the email package", 15 stacklevel=2) 16 17 __all__ = ["Message","choose_boundary","encode","decode","copyliteral", 18 "copybinary"] 19 20 class Message(rfc822.Message): 21 """A derived class of rfc822.Message that knows about MIME headers and 22 contains some hooks for decoding encoded and multipart messages.""" 23 24 def __init__(self, fp, seekable = 1): 25 rfc822.Message.__init__(self, fp, seekable) 26 self.encodingheader = \ 27 self.getheader('content-transfer-encoding') 28 self.typeheader = \ 29 self.getheader('content-type') 30 self.parsetype() 31 self.parseplist() 32 33 def parsetype(self): 34 str = self.typeheader 35 if str is None: 36 str = 'text/plain' 37 if ';' in str: 38 i = str.index(';') 39 self.plisttext = str[i:] 40 str = str[:i] 41 else: 42 self.plisttext = '' 43 fields = str.split('/') 44 for i in range(len(fields)): 45 fields[i] = fields[i].strip().lower() 46 self.type = '/'.join(fields) 47 self.maintype = fields[0] 48 self.subtype = '/'.join(fields[1:]) 49 50 def parseplist(self): 51 str = self.plisttext 52 self.plist = [] 53 while str[:1] == ';': 54 str = str[1:] 55 if ';' in str: 56 # XXX Should parse quotes! 57 end = str.index(';') 58 else: 59 end = len(str) 60 f = str[:end] 61 if '=' in f: 62 i = f.index('=') 63 f = f[:i].strip().lower() + \ 64 '=' + f[i+1:].strip() 65 self.plist.append(f.strip()) 66 str = str[end:] 67 68 def getplist(self): 69 return self.plist 70 71 def getparam(self, name): 72 name = name.lower() + '=' 73 n = len(name) 74 for p in self.plist: 75 if p[:n] == name: 76 return rfc822.unquote(p[n:]) 77 return None 78 79 def getparamnames(self): 80 result = [] 81 for p in self.plist: 82 i = p.find('=') 83 if i >= 0: 84 result.append(p[:i].lower()) 85 return result 86 87 def getencoding(self): 88 if self.encodingheader is None: 89 return '7bit' 90 return self.encodingheader.lower() 91 92 def gettype(self): 93 return self.type 94 95 def getmaintype(self): 96 return self.maintype 97 98 def getsubtype(self): 99 return self.subtype 100 101 102 103 104 # Utility functions 105 # ----------------- 106 107 try: 108 import thread 109 except ImportError: 110 import dummy_thread as thread 111 _counter_lock = thread.allocate_lock() 112 del thread 113 114 _counter = 0 115 def _get_next_counter(): 116 global _counter 117 _counter_lock.acquire() 118 _counter += 1 119 result = _counter 120 _counter_lock.release() 121 return result 122 123 _prefix = None 124 125 def choose_boundary(): 126 """Return a string usable as a multipart boundary. 127 128 The string chosen is unique within a single program run, and 129 incorporates the user id (if available), process id (if available), 130 and current time. So it's very unlikely the returned string appears 131 in message text, but there's no guarantee. 132 133 The boundary contains dots so you have to quote it in the header.""" 134 135 global _prefix 136 import time 137 if _prefix is None: 138 import socket 139 try: 140 hostid = socket.gethostbyname(socket.gethostname()) 141 except socket.gaierror: 142 hostid = '127.0.0.1' 143 try: 144 uid = repr(os.getuid()) 145 except AttributeError: 146 uid = '1' 147 try: 148 pid = repr(os.getpid()) 149 except AttributeError: 150 pid = '1' 151 _prefix = hostid + '.' + uid + '.' + pid 152 return "%s.%.3f.%d" % (_prefix, time.time(), _get_next_counter()) 153 154 155 # Subroutines for decoding some common content-transfer-types 156 157 def decode(input, output, encoding): 158 """Decode common content-transfer-encodings (base64, quopri, uuencode).""" 159 if encoding == 'base64': 160 import base64 161 return base64.decode(input, output) 162 if encoding == 'quoted-printable': 163 import quopri 164 return quopri.decode(input, output) 165 if encoding in ('uuencode', 'x-uuencode', 'uue', 'x-uue'): 166 import uu 167 return uu.decode(input, output) 168 if encoding in ('7bit', '8bit'): 169 return output.write(input.read()) 170 if encoding in decodetab: 171 pipethrough(input, decodetab[encoding], output) 172 else: 173 raise ValueError, \ 174 'unknown Content-Transfer-Encoding: %s' % encoding 175 176 def encode(input, output, encoding): 177 """Encode common content-transfer-encodings (base64, quopri, uuencode).""" 178 if encoding == 'base64': 179 import base64 180 return base64.encode(input, output) 181 if encoding == 'quoted-printable': 182 import quopri 183 return quopri.encode(input, output, 0) 184 if encoding in ('uuencode', 'x-uuencode', 'uue', 'x-uue'): 185 import uu 186 return uu.encode(input, output) 187 if encoding in ('7bit', '8bit'): 188 return output.write(input.read()) 189 if encoding in encodetab: 190 pipethrough(input, encodetab[encoding], output) 191 else: 192 raise ValueError, \ 193 'unknown Content-Transfer-Encoding: %s' % encoding 194 195 # The following is no longer used for standard encodings 196 197 # XXX This requires that uudecode and mmencode are in $PATH 198 199 uudecode_pipe = '''( 200 TEMP=/tmp/@uu.$$ 201 sed "s%^begin [0-7][0-7]* .*%begin 600 $TEMP%" | uudecode 202 cat $TEMP 203 rm $TEMP 204 )''' 205 206 decodetab = { 207 'uuencode': uudecode_pipe, 208 'x-uuencode': uudecode_pipe, 209 'uue': uudecode_pipe, 210 'x-uue': uudecode_pipe, 211 'quoted-printable': 'mmencode -u -q', 212 'base64': 'mmencode -u -b', 213 } 214 215 encodetab = { 216 'x-uuencode': 'uuencode tempfile', 217 'uuencode': 'uuencode tempfile', 218 'x-uue': 'uuencode tempfile', 219 'uue': 'uuencode tempfile', 220 'quoted-printable': 'mmencode -q', 221 'base64': 'mmencode -b', 222 } 223 224 def pipeto(input, command): 225 pipe = os.popen(command, 'w') 226 copyliteral(input, pipe) 227 pipe.close() 228 229 def pipethrough(input, command, output): 230 (fd, tempname) = tempfile.mkstemp() 231 temp = os.fdopen(fd, 'w') 232 copyliteral(input, temp) 233 temp.close() 234 pipe = os.popen(command + ' <' + tempname, 'r') 235 copybinary(pipe, output) 236 pipe.close() 237 os.unlink(tempname) 238 239 def copyliteral(input, output): 240 while 1: 241 line = input.readline() 242 if not line: break 243 output.write(line) 244 245 def copybinary(input, output): 246 BUFSIZE = 8192 247 while 1: 248 line = input.read(BUFSIZE) 249 if not line: break 250 output.write(line) 251