1 """Macintosh binhex compression/decompression. 2 3 easy interface: 4 binhex(inputfilename, outputfilename) 5 hexbin(inputfilename, outputfilename) 6 """ 7 8 # 9 # Jack Jansen, CWI, August 1995. 10 # 11 # The module is supposed to be as compatible as possible. Especially the 12 # easy interface should work "as expected" on any platform. 13 # XXXX Note: currently, textfiles appear in mac-form on all platforms. 14 # We seem to lack a simple character-translate in python. 15 # (we should probably use ISO-Latin-1 on all but the mac platform). 16 # XXXX The simple routines are too simple: they expect to hold the complete 17 # files in-core. Should be fixed. 18 # XXXX It would be nice to handle AppleDouble format on unix 19 # (for servers serving macs). 20 # XXXX I don't understand what happens when you get 0x90 times the same byte on 21 # input. The resulting code (xx 90 90) would appear to be interpreted as an 22 # escaped *value* of 0x90. All coders I've seen appear to ignore this nicety... 23 # 24 import io 25 import os 26 import struct 27 import binascii 28 29 __all__ = ["binhex","hexbin","Error"] 30 31 class Error(Exception): 32 pass 33 34 # States (what have we written) 35 _DID_HEADER = 0 36 _DID_DATA = 1 37 38 # Various constants 39 REASONABLY_LARGE = 32768 # Minimal amount we pass the rle-coder 40 LINELEN = 64 41 RUNCHAR = b"\x90" 42 43 # 44 # This code is no longer byte-order dependent 45 46 47 class FInfo: 48 def __init__(self): 49 self.Type = '????' 50 self.Creator = '????' 51 self.Flags = 0 52 53 def getfileinfo(name): 54 finfo = FInfo() 55 with io.open(name, 'rb') as fp: 56 # Quick check for textfile 57 data = fp.read(512) 58 if 0 not in data: 59 finfo.Type = 'TEXT' 60 fp.seek(0, 2) 61 dsize = fp.tell() 62 dir, file = os.path.split(name) 63 file = file.replace(':', '-', 1) 64 return file, finfo, dsize, 0 65 66 class openrsrc: 67 def __init__(self, *args): 68 pass 69 70 def read(self, *args): 71 return b'' 72 73 def write(self, *args): 74 pass 75 76 def close(self): 77 pass 78 79 class _Hqxcoderengine: 80 """Write data to the coder in 3-byte chunks""" 81 82 def __init__(self, ofp): 83 self.ofp = ofp 84 self.data = b'' 85 self.hqxdata = b'' 86 self.linelen = LINELEN - 1 87 88 def write(self, data): 89 self.data = self.data + data 90 datalen = len(self.data) 91 todo = (datalen // 3) * 3 92 data = self.data[:todo] 93 self.data = self.data[todo:] 94 if not data: 95 return 96 self.hqxdata = self.hqxdata + binascii.b2a_hqx(data) 97 self._flush(0) 98 99 def _flush(self, force): 100 first = 0 101 while first <= len(self.hqxdata) - self.linelen: 102 last = first + self.linelen 103 self.ofp.write(self.hqxdata[first:last] + b'\n') 104 self.linelen = LINELEN 105 first = last 106 self.hqxdata = self.hqxdata[first:] 107 if force: 108 self.ofp.write(self.hqxdata + b':\n') 109 110 def close(self): 111 if self.data: 112 self.hqxdata = self.hqxdata + binascii.b2a_hqx(self.data) 113 self._flush(1) 114 self.ofp.close() 115 del self.ofp 116 117 class _Rlecoderengine: 118 """Write data to the RLE-coder in suitably large chunks""" 119 120 def __init__(self, ofp): 121 self.ofp = ofp 122 self.data = b'' 123 124 def write(self, data): 125 self.data = self.data + data 126 if len(self.data) < REASONABLY_LARGE: 127 return 128 rledata = binascii.rlecode_hqx(self.data) 129 self.ofp.write(rledata) 130 self.data = b'' 131 132 def close(self): 133 if self.data: 134 rledata = binascii.rlecode_hqx(self.data) 135 self.ofp.write(rledata) 136 self.ofp.close() 137 del self.ofp 138 139 class BinHex: 140 def __init__(self, name_finfo_dlen_rlen, ofp): 141 name, finfo, dlen, rlen = name_finfo_dlen_rlen 142 close_on_error = False 143 if isinstance(ofp, str): 144 ofname = ofp 145 ofp = io.open(ofname, 'wb') 146 close_on_error = True 147 try: 148 ofp.write(b'(This file must be converted with BinHex 4.0)\r\r:') 149 hqxer = _Hqxcoderengine(ofp) 150 self.ofp = _Rlecoderengine(hqxer) 151 self.crc = 0 152 if finfo is None: 153 finfo = FInfo() 154 self.dlen = dlen 155 self.rlen = rlen 156 self._writeinfo(name, finfo) 157 self.state = _DID_HEADER 158 except: 159 if close_on_error: 160 ofp.close() 161 raise 162 163 def _writeinfo(self, name, finfo): 164 nl = len(name) 165 if nl > 63: 166 raise Error('Filename too long') 167 d = bytes([nl]) + name.encode("latin-1") + b'\0' 168 tp, cr = finfo.Type, finfo.Creator 169 if isinstance(tp, str): 170 tp = tp.encode("latin-1") 171 if isinstance(cr, str): 172 cr = cr.encode("latin-1") 173 d2 = tp + cr 174 175 # Force all structs to be packed with big-endian 176 d3 = struct.pack('>h', finfo.Flags) 177 d4 = struct.pack('>ii', self.dlen, self.rlen) 178 info = d + d2 + d3 + d4 179 self._write(info) 180 self._writecrc() 181 182 def _write(self, data): 183 self.crc = binascii.crc_hqx(data, self.crc) 184 self.ofp.write(data) 185 186 def _writecrc(self): 187 # XXXX Should this be here?? 188 # self.crc = binascii.crc_hqx('\0\0', self.crc) 189 if self.crc < 0: 190 fmt = '>h' 191 else: 192 fmt = '>H' 193 self.ofp.write(struct.pack(fmt, self.crc)) 194 self.crc = 0 195 196 def write(self, data): 197 if self.state != _DID_HEADER: 198 raise Error('Writing data at the wrong time') 199 self.dlen = self.dlen - len(data) 200 self._write(data) 201 202 def close_data(self): 203 if self.dlen != 0: 204 raise Error('Incorrect data size, diff=%r' % (self.rlen,)) 205 self._writecrc() 206 self.state = _DID_DATA 207 208 def write_rsrc(self, data): 209 if self.state < _DID_DATA: 210 self.close_data() 211 if self.state != _DID_DATA: 212 raise Error('Writing resource data at the wrong time') 213 self.rlen = self.rlen - len(data) 214 self._write(data) 215 216 def close(self): 217 if self.state is None: 218 return 219 try: 220 if self.state < _DID_DATA: 221 self.close_data() 222 if self.state != _DID_DATA: 223 raise Error('Close at the wrong time') 224 if self.rlen != 0: 225 raise Error("Incorrect resource-datasize, diff=%r" % (self.rlen,)) 226 self._writecrc() 227 finally: 228 self.state = None 229 ofp = self.ofp 230 del self.ofp 231 ofp.close() 232 233 def binhex(inp, out): 234 """binhex(infilename, outfilename): create binhex-encoded copy of a file""" 235 finfo = getfileinfo(inp) 236 ofp = BinHex(finfo, out) 237 238 with io.open(inp, 'rb') as ifp: 239 # XXXX Do textfile translation on non-mac systems 240 while True: 241 d = ifp.read(128000) 242 if not d: break 243 ofp.write(d) 244 ofp.close_data() 245 246 ifp = openrsrc(inp, 'rb') 247 while True: 248 d = ifp.read(128000) 249 if not d: break 250 ofp.write_rsrc(d) 251 ofp.close() 252 ifp.close() 253 254 class _Hqxdecoderengine: 255 """Read data via the decoder in 4-byte chunks""" 256 257 def __init__(self, ifp): 258 self.ifp = ifp 259 self.eof = 0 260 261 def read(self, totalwtd): 262 """Read at least wtd bytes (or until EOF)""" 263 decdata = b'' 264 wtd = totalwtd 265 # 266 # The loop here is convoluted, since we don't really now how 267 # much to decode: there may be newlines in the incoming data. 268 while wtd > 0: 269 if self.eof: return decdata 270 wtd = ((wtd + 2) // 3) * 4 271 data = self.ifp.read(wtd) 272 # 273 # Next problem: there may not be a complete number of 274 # bytes in what we pass to a2b. Solve by yet another 275 # loop. 276 # 277 while True: 278 try: 279 decdatacur, self.eof = binascii.a2b_hqx(data) 280 break 281 except binascii.Incomplete: 282 pass 283 newdata = self.ifp.read(1) 284 if not newdata: 285 raise Error('Premature EOF on binhex file') 286 data = data + newdata 287 decdata = decdata + decdatacur 288 wtd = totalwtd - len(decdata) 289 if not decdata and not self.eof: 290 raise Error('Premature EOF on binhex file') 291 return decdata 292 293 def close(self): 294 self.ifp.close() 295 296 class _Rledecoderengine: 297 """Read data via the RLE-coder""" 298 299 def __init__(self, ifp): 300 self.ifp = ifp 301 self.pre_buffer = b'' 302 self.post_buffer = b'' 303 self.eof = 0 304 305 def read(self, wtd): 306 if wtd > len(self.post_buffer): 307 self._fill(wtd - len(self.post_buffer)) 308 rv = self.post_buffer[:wtd] 309 self.post_buffer = self.post_buffer[wtd:] 310 return rv 311 312 def _fill(self, wtd): 313 self.pre_buffer = self.pre_buffer + self.ifp.read(wtd + 4) 314 if self.ifp.eof: 315 self.post_buffer = self.post_buffer + \ 316 binascii.rledecode_hqx(self.pre_buffer) 317 self.pre_buffer = b'' 318 return 319 320 # 321 # Obfuscated code ahead. We have to take care that we don't 322 # end up with an orphaned RUNCHAR later on. So, we keep a couple 323 # of bytes in the buffer, depending on what the end of 324 # the buffer looks like: 325 # '\220\0\220' - Keep 3 bytes: repeated \220 (escaped as \220\0) 326 # '?\220' - Keep 2 bytes: repeated something-else 327 # '\220\0' - Escaped \220: Keep 2 bytes. 328 # '?\220?' - Complete repeat sequence: decode all 329 # otherwise: keep 1 byte. 330 # 331 mark = len(self.pre_buffer) 332 if self.pre_buffer[-3:] == RUNCHAR + b'\0' + RUNCHAR: 333 mark = mark - 3 334 elif self.pre_buffer[-1:] == RUNCHAR: 335 mark = mark - 2 336 elif self.pre_buffer[-2:] == RUNCHAR + b'\0': 337 mark = mark - 2 338 elif self.pre_buffer[-2:-1] == RUNCHAR: 339 pass # Decode all 340 else: 341 mark = mark - 1 342 343 self.post_buffer = self.post_buffer + \ 344 binascii.rledecode_hqx(self.pre_buffer[:mark]) 345 self.pre_buffer = self.pre_buffer[mark:] 346 347 def close(self): 348 self.ifp.close() 349 350 class HexBin: 351 def __init__(self, ifp): 352 if isinstance(ifp, str): 353 ifp = io.open(ifp, 'rb') 354 # 355 # Find initial colon. 356 # 357 while True: 358 ch = ifp.read(1) 359 if not ch: 360 raise Error("No binhex data found") 361 # Cater for \r\n terminated lines (which show up as \n\r, hence 362 # all lines start with \r) 363 if ch == b'\r': 364 continue 365 if ch == b':': 366 break 367 368 hqxifp = _Hqxdecoderengine(ifp) 369 self.ifp = _Rledecoderengine(hqxifp) 370 self.crc = 0 371 self._readheader() 372 373 def _read(self, len): 374 data = self.ifp.read(len) 375 self.crc = binascii.crc_hqx(data, self.crc) 376 return data 377 378 def _checkcrc(self): 379 filecrc = struct.unpack('>h', self.ifp.read(2))[0] & 0xffff 380 #self.crc = binascii.crc_hqx('\0\0', self.crc) 381 # XXXX Is this needed?? 382 self.crc = self.crc & 0xffff 383 if filecrc != self.crc: 384 raise Error('CRC error, computed %x, read %x' 385 % (self.crc, filecrc)) 386 self.crc = 0 387 388 def _readheader(self): 389 len = self._read(1) 390 fname = self._read(ord(len)) 391 rest = self._read(1 + 4 + 4 + 2 + 4 + 4) 392 self._checkcrc() 393 394 type = rest[1:5] 395 creator = rest[5:9] 396 flags = struct.unpack('>h', rest[9:11])[0] 397 self.dlen = struct.unpack('>l', rest[11:15])[0] 398 self.rlen = struct.unpack('>l', rest[15:19])[0] 399 400 self.FName = fname 401 self.FInfo = FInfo() 402 self.FInfo.Creator = creator 403 self.FInfo.Type = type 404 self.FInfo.Flags = flags 405 406 self.state = _DID_HEADER 407 408 def read(self, *n): 409 if self.state != _DID_HEADER: 410 raise Error('Read data at wrong time') 411 if n: 412 n = n[0] 413 n = min(n, self.dlen) 414 else: 415 n = self.dlen 416 rv = b'' 417 while len(rv) < n: 418 rv = rv + self._read(n-len(rv)) 419 self.dlen = self.dlen - n 420 return rv 421 422 def close_data(self): 423 if self.state != _DID_HEADER: 424 raise Error('close_data at wrong time') 425 if self.dlen: 426 dummy = self._read(self.dlen) 427 self._checkcrc() 428 self.state = _DID_DATA 429 430 def read_rsrc(self, *n): 431 if self.state == _DID_HEADER: 432 self.close_data() 433 if self.state != _DID_DATA: 434 raise Error('Read resource data at wrong time') 435 if n: 436 n = n[0] 437 n = min(n, self.rlen) 438 else: 439 n = self.rlen 440 self.rlen = self.rlen - n 441 return self._read(n) 442 443 def close(self): 444 if self.state is None: 445 return 446 try: 447 if self.rlen: 448 dummy = self.read_rsrc(self.rlen) 449 self._checkcrc() 450 finally: 451 self.state = None 452 self.ifp.close() 453 454 def hexbin(inp, out): 455 """hexbin(infilename, outfilename) - Decode binhexed file""" 456 ifp = HexBin(inp) 457 finfo = ifp.FInfo 458 if not out: 459 out = ifp.FName 460 461 with io.open(out, 'wb') as ofp: 462 # XXXX Do translation on non-mac systems 463 while True: 464 d = ifp.read(128000) 465 if not d: break 466 ofp.write(d) 467 ifp.close_data() 468 469 d = ifp.read_rsrc(128000) 470 if d: 471 ofp = openrsrc(out, 'wb') 472 ofp.write(d) 473 while True: 474 d = ifp.read_rsrc(128000) 475 if not d: break 476 ofp.write(d) 477 ofp.close() 478 479 ifp.close() 480