Home | History | Annotate | Download | only in Lib
      1 """Macintosh binhex compression/decompression.
      2 
      3 easy interface:
      4 binhex(inputfilename, outputfilename)
      5 hexbin(inputfilename, outputfilename)
      6 """
      7 
      8 #
      9 # Jack Jansen, CWI, August 1995.
     10 #
     11 # The module is supposed to be as compatible as possible. Especially the
     12 # easy interface should work "as expected" on any platform.
     13 # XXXX Note: currently, textfiles appear in mac-form on all platforms.
     14 # We seem to lack a simple character-translate in python.
     15 # (we should probably use ISO-Latin-1 on all but the mac platform).
     16 # XXXX The simple routines are too simple: they expect to hold the complete
     17 # files in-core. Should be fixed.
     18 # XXXX It would be nice to handle AppleDouble format on unix
     19 # (for servers serving macs).
     20 # XXXX I don't understand what happens when you get 0x90 times the same byte on
     21 # input. The resulting code (xx 90 90) would appear to be interpreted as an
     22 # escaped *value* of 0x90. All coders I've seen appear to ignore this nicety...
     23 #
     24 import io
     25 import os
     26 import struct
     27 import binascii
     28 
     29 __all__ = ["binhex","hexbin","Error"]
     30 
     31 class Error(Exception):
     32     pass
     33 
     34 # States (what have we written)
     35 _DID_HEADER = 0
     36 _DID_DATA = 1
     37 
     38 # Various constants
     39 REASONABLY_LARGE = 32768  # Minimal amount we pass the rle-coder
     40 LINELEN = 64
     41 RUNCHAR = b"\x90"
     42 
     43 #
     44 # This code is no longer byte-order dependent
     45 
     46 
     47 class FInfo:
     48     def __init__(self):
     49         self.Type = '????'
     50         self.Creator = '????'
     51         self.Flags = 0
     52 
     53 def getfileinfo(name):
     54     finfo = FInfo()
     55     with io.open(name, 'rb') as fp:
     56         # Quick check for textfile
     57         data = fp.read(512)
     58         if 0 not in data:
     59             finfo.Type = 'TEXT'
     60         fp.seek(0, 2)
     61         dsize = fp.tell()
     62     dir, file = os.path.split(name)
     63     file = file.replace(':', '-', 1)
     64     return file, finfo, dsize, 0
     65 
     66 class openrsrc:
     67     def __init__(self, *args):
     68         pass
     69 
     70     def read(self, *args):
     71         return b''
     72 
     73     def write(self, *args):
     74         pass
     75 
     76     def close(self):
     77         pass
     78 
     79 class _Hqxcoderengine:
     80     """Write data to the coder in 3-byte chunks"""
     81 
     82     def __init__(self, ofp):
     83         self.ofp = ofp
     84         self.data = b''
     85         self.hqxdata = b''
     86         self.linelen = LINELEN - 1
     87 
     88     def write(self, data):
     89         self.data = self.data + data
     90         datalen = len(self.data)
     91         todo = (datalen // 3) * 3
     92         data = self.data[:todo]
     93         self.data = self.data[todo:]
     94         if not data:
     95             return
     96         self.hqxdata = self.hqxdata + binascii.b2a_hqx(data)
     97         self._flush(0)
     98 
     99     def _flush(self, force):
    100         first = 0
    101         while first <= len(self.hqxdata) - self.linelen:
    102             last = first + self.linelen
    103             self.ofp.write(self.hqxdata[first:last] + b'\n')
    104             self.linelen = LINELEN
    105             first = last
    106         self.hqxdata = self.hqxdata[first:]
    107         if force:
    108             self.ofp.write(self.hqxdata + b':\n')
    109 
    110     def close(self):
    111         if self.data:
    112             self.hqxdata = self.hqxdata + binascii.b2a_hqx(self.data)
    113         self._flush(1)
    114         self.ofp.close()
    115         del self.ofp
    116 
    117 class _Rlecoderengine:
    118     """Write data to the RLE-coder in suitably large chunks"""
    119 
    120     def __init__(self, ofp):
    121         self.ofp = ofp
    122         self.data = b''
    123 
    124     def write(self, data):
    125         self.data = self.data + data
    126         if len(self.data) < REASONABLY_LARGE:
    127             return
    128         rledata = binascii.rlecode_hqx(self.data)
    129         self.ofp.write(rledata)
    130         self.data = b''
    131 
    132     def close(self):
    133         if self.data:
    134             rledata = binascii.rlecode_hqx(self.data)
    135             self.ofp.write(rledata)
    136         self.ofp.close()
    137         del self.ofp
    138 
    139 class BinHex:
    140     def __init__(self, name_finfo_dlen_rlen, ofp):
    141         name, finfo, dlen, rlen = name_finfo_dlen_rlen
    142         close_on_error = False
    143         if isinstance(ofp, str):
    144             ofname = ofp
    145             ofp = io.open(ofname, 'wb')
    146             close_on_error = True
    147         try:
    148             ofp.write(b'(This file must be converted with BinHex 4.0)\r\r:')
    149             hqxer = _Hqxcoderengine(ofp)
    150             self.ofp = _Rlecoderengine(hqxer)
    151             self.crc = 0
    152             if finfo is None:
    153                 finfo = FInfo()
    154             self.dlen = dlen
    155             self.rlen = rlen
    156             self._writeinfo(name, finfo)
    157             self.state = _DID_HEADER
    158         except:
    159             if close_on_error:
    160                 ofp.close()
    161             raise
    162 
    163     def _writeinfo(self, name, finfo):
    164         nl = len(name)
    165         if nl > 63:
    166             raise Error('Filename too long')
    167         d = bytes([nl]) + name.encode("latin-1") + b'\0'
    168         tp, cr = finfo.Type, finfo.Creator
    169         if isinstance(tp, str):
    170             tp = tp.encode("latin-1")
    171         if isinstance(cr, str):
    172             cr = cr.encode("latin-1")
    173         d2 = tp + cr
    174 
    175         # Force all structs to be packed with big-endian
    176         d3 = struct.pack('>h', finfo.Flags)
    177         d4 = struct.pack('>ii', self.dlen, self.rlen)
    178         info = d + d2 + d3 + d4
    179         self._write(info)
    180         self._writecrc()
    181 
    182     def _write(self, data):
    183         self.crc = binascii.crc_hqx(data, self.crc)
    184         self.ofp.write(data)
    185 
    186     def _writecrc(self):
    187         # XXXX Should this be here??
    188         # self.crc = binascii.crc_hqx('\0\0', self.crc)
    189         if self.crc < 0:
    190             fmt = '>h'
    191         else:
    192             fmt = '>H'
    193         self.ofp.write(struct.pack(fmt, self.crc))
    194         self.crc = 0
    195 
    196     def write(self, data):
    197         if self.state != _DID_HEADER:
    198             raise Error('Writing data at the wrong time')
    199         self.dlen = self.dlen - len(data)
    200         self._write(data)
    201 
    202     def close_data(self):
    203         if self.dlen != 0:
    204             raise Error('Incorrect data size, diff=%r' % (self.rlen,))
    205         self._writecrc()
    206         self.state = _DID_DATA
    207 
    208     def write_rsrc(self, data):
    209         if self.state < _DID_DATA:
    210             self.close_data()
    211         if self.state != _DID_DATA:
    212             raise Error('Writing resource data at the wrong time')
    213         self.rlen = self.rlen - len(data)
    214         self._write(data)
    215 
    216     def close(self):
    217         if self.state is None:
    218             return
    219         try:
    220             if self.state < _DID_DATA:
    221                 self.close_data()
    222             if self.state != _DID_DATA:
    223                 raise Error('Close at the wrong time')
    224             if self.rlen != 0:
    225                 raise Error("Incorrect resource-datasize, diff=%r" % (self.rlen,))
    226             self._writecrc()
    227         finally:
    228             self.state = None
    229             ofp = self.ofp
    230             del self.ofp
    231             ofp.close()
    232 
    233 def binhex(inp, out):
    234     """binhex(infilename, outfilename): create binhex-encoded copy of a file"""
    235     finfo = getfileinfo(inp)
    236     ofp = BinHex(finfo, out)
    237 
    238     with io.open(inp, 'rb') as ifp:
    239         # XXXX Do textfile translation on non-mac systems
    240         while True:
    241             d = ifp.read(128000)
    242             if not d: break
    243             ofp.write(d)
    244         ofp.close_data()
    245 
    246     ifp = openrsrc(inp, 'rb')
    247     while True:
    248         d = ifp.read(128000)
    249         if not d: break
    250         ofp.write_rsrc(d)
    251     ofp.close()
    252     ifp.close()
    253 
    254 class _Hqxdecoderengine:
    255     """Read data via the decoder in 4-byte chunks"""
    256 
    257     def __init__(self, ifp):
    258         self.ifp = ifp
    259         self.eof = 0
    260 
    261     def read(self, totalwtd):
    262         """Read at least wtd bytes (or until EOF)"""
    263         decdata = b''
    264         wtd = totalwtd
    265         #
    266         # The loop here is convoluted, since we don't really now how
    267         # much to decode: there may be newlines in the incoming data.
    268         while wtd > 0:
    269             if self.eof: return decdata
    270             wtd = ((wtd + 2) // 3) * 4
    271             data = self.ifp.read(wtd)
    272             #
    273             # Next problem: there may not be a complete number of
    274             # bytes in what we pass to a2b. Solve by yet another
    275             # loop.
    276             #
    277             while True:
    278                 try:
    279                     decdatacur, self.eof = binascii.a2b_hqx(data)
    280                     break
    281                 except binascii.Incomplete:
    282                     pass
    283                 newdata = self.ifp.read(1)
    284                 if not newdata:
    285                     raise Error('Premature EOF on binhex file')
    286                 data = data + newdata
    287             decdata = decdata + decdatacur
    288             wtd = totalwtd - len(decdata)
    289             if not decdata and not self.eof:
    290                 raise Error('Premature EOF on binhex file')
    291         return decdata
    292 
    293     def close(self):
    294         self.ifp.close()
    295 
    296 class _Rledecoderengine:
    297     """Read data via the RLE-coder"""
    298 
    299     def __init__(self, ifp):
    300         self.ifp = ifp
    301         self.pre_buffer = b''
    302         self.post_buffer = b''
    303         self.eof = 0
    304 
    305     def read(self, wtd):
    306         if wtd > len(self.post_buffer):
    307             self._fill(wtd - len(self.post_buffer))
    308         rv = self.post_buffer[:wtd]
    309         self.post_buffer = self.post_buffer[wtd:]
    310         return rv
    311 
    312     def _fill(self, wtd):
    313         self.pre_buffer = self.pre_buffer + self.ifp.read(wtd + 4)
    314         if self.ifp.eof:
    315             self.post_buffer = self.post_buffer + \
    316                 binascii.rledecode_hqx(self.pre_buffer)
    317             self.pre_buffer = b''
    318             return
    319 
    320         #
    321         # Obfuscated code ahead. We have to take care that we don't
    322         # end up with an orphaned RUNCHAR later on. So, we keep a couple
    323         # of bytes in the buffer, depending on what the end of
    324         # the buffer looks like:
    325         # '\220\0\220' - Keep 3 bytes: repeated \220 (escaped as \220\0)
    326         # '?\220' - Keep 2 bytes: repeated something-else
    327         # '\220\0' - Escaped \220: Keep 2 bytes.
    328         # '?\220?' - Complete repeat sequence: decode all
    329         # otherwise: keep 1 byte.
    330         #
    331         mark = len(self.pre_buffer)
    332         if self.pre_buffer[-3:] == RUNCHAR + b'\0' + RUNCHAR:
    333             mark = mark - 3
    334         elif self.pre_buffer[-1:] == RUNCHAR:
    335             mark = mark - 2
    336         elif self.pre_buffer[-2:] == RUNCHAR + b'\0':
    337             mark = mark - 2
    338         elif self.pre_buffer[-2:-1] == RUNCHAR:
    339             pass # Decode all
    340         else:
    341             mark = mark - 1
    342 
    343         self.post_buffer = self.post_buffer + \
    344             binascii.rledecode_hqx(self.pre_buffer[:mark])
    345         self.pre_buffer = self.pre_buffer[mark:]
    346 
    347     def close(self):
    348         self.ifp.close()
    349 
    350 class HexBin:
    351     def __init__(self, ifp):
    352         if isinstance(ifp, str):
    353             ifp = io.open(ifp, 'rb')
    354         #
    355         # Find initial colon.
    356         #
    357         while True:
    358             ch = ifp.read(1)
    359             if not ch:
    360                 raise Error("No binhex data found")
    361             # Cater for \r\n terminated lines (which show up as \n\r, hence
    362             # all lines start with \r)
    363             if ch == b'\r':
    364                 continue
    365             if ch == b':':
    366                 break
    367 
    368         hqxifp = _Hqxdecoderengine(ifp)
    369         self.ifp = _Rledecoderengine(hqxifp)
    370         self.crc = 0
    371         self._readheader()
    372 
    373     def _read(self, len):
    374         data = self.ifp.read(len)
    375         self.crc = binascii.crc_hqx(data, self.crc)
    376         return data
    377 
    378     def _checkcrc(self):
    379         filecrc = struct.unpack('>h', self.ifp.read(2))[0] & 0xffff
    380         #self.crc = binascii.crc_hqx('\0\0', self.crc)
    381         # XXXX Is this needed??
    382         self.crc = self.crc & 0xffff
    383         if filecrc != self.crc:
    384             raise Error('CRC error, computed %x, read %x'
    385                         % (self.crc, filecrc))
    386         self.crc = 0
    387 
    388     def _readheader(self):
    389         len = self._read(1)
    390         fname = self._read(ord(len))
    391         rest = self._read(1 + 4 + 4 + 2 + 4 + 4)
    392         self._checkcrc()
    393 
    394         type = rest[1:5]
    395         creator = rest[5:9]
    396         flags = struct.unpack('>h', rest[9:11])[0]
    397         self.dlen = struct.unpack('>l', rest[11:15])[0]
    398         self.rlen = struct.unpack('>l', rest[15:19])[0]
    399 
    400         self.FName = fname
    401         self.FInfo = FInfo()
    402         self.FInfo.Creator = creator
    403         self.FInfo.Type = type
    404         self.FInfo.Flags = flags
    405 
    406         self.state = _DID_HEADER
    407 
    408     def read(self, *n):
    409         if self.state != _DID_HEADER:
    410             raise Error('Read data at wrong time')
    411         if n:
    412             n = n[0]
    413             n = min(n, self.dlen)
    414         else:
    415             n = self.dlen
    416         rv = b''
    417         while len(rv) < n:
    418             rv = rv + self._read(n-len(rv))
    419         self.dlen = self.dlen - n
    420         return rv
    421 
    422     def close_data(self):
    423         if self.state != _DID_HEADER:
    424             raise Error('close_data at wrong time')
    425         if self.dlen:
    426             dummy = self._read(self.dlen)
    427         self._checkcrc()
    428         self.state = _DID_DATA
    429 
    430     def read_rsrc(self, *n):
    431         if self.state == _DID_HEADER:
    432             self.close_data()
    433         if self.state != _DID_DATA:
    434             raise Error('Read resource data at wrong time')
    435         if n:
    436             n = n[0]
    437             n = min(n, self.rlen)
    438         else:
    439             n = self.rlen
    440         self.rlen = self.rlen - n
    441         return self._read(n)
    442 
    443     def close(self):
    444         if self.state is None:
    445             return
    446         try:
    447             if self.rlen:
    448                 dummy = self.read_rsrc(self.rlen)
    449             self._checkcrc()
    450         finally:
    451             self.state = None
    452             self.ifp.close()
    453 
    454 def hexbin(inp, out):
    455     """hexbin(infilename, outfilename) - Decode binhexed file"""
    456     ifp = HexBin(inp)
    457     finfo = ifp.FInfo
    458     if not out:
    459         out = ifp.FName
    460 
    461     with io.open(out, 'wb') as ofp:
    462         # XXXX Do translation on non-mac systems
    463         while True:
    464             d = ifp.read(128000)
    465             if not d: break
    466             ofp.write(d)
    467     ifp.close_data()
    468 
    469     d = ifp.read_rsrc(128000)
    470     if d:
    471         ofp = openrsrc(out, 'wb')
    472         ofp.write(d)
    473         while True:
    474             d = ifp.read_rsrc(128000)
    475             if not d: break
    476             ofp.write(d)
    477         ofp.close()
    478 
    479     ifp.close()
    480