Home | History | Annotate | Download | only in Lib
      1 """Stuff to parse Sun and NeXT audio files.
      2 
      3 An audio file consists of a header followed by the data.  The structure
      4 of the header is as follows.
      5 
      6         +---------------+
      7         | magic word    |
      8         +---------------+
      9         | header size   |
     10         +---------------+
     11         | data size     |
     12         +---------------+
     13         | encoding      |
     14         +---------------+
     15         | sample rate   |
     16         +---------------+
     17         | # of channels |
     18         +---------------+
     19         | info          |
     20         |               |
     21         +---------------+
     22 
     23 The magic word consists of the 4 characters '.snd'.  Apart from the
     24 info field, all header fields are 4 bytes in size.  They are all
     25 32-bit unsigned integers encoded in big-endian byte order.
     26 
     27 The header size really gives the start of the data.
     28 The data size is the physical size of the data.  From the other
     29 parameters the number of frames can be calculated.
     30 The encoding gives the way in which audio samples are encoded.
     31 Possible values are listed below.
     32 The info field currently consists of an ASCII string giving a
     33 human-readable description of the audio file.  The info field is
     34 padded with NUL bytes to the header size.
     35 
     36 Usage.
     37 
     38 Reading audio files:
     39         f = sunau.open(file, 'r')
     40 where file is either the name of a file or an open file pointer.
     41 The open file pointer must have methods read(), seek(), and close().
     42 When the setpos() and rewind() methods are not used, the seek()
     43 method is not  necessary.
     44 
     45 This returns an instance of a class with the following public methods:
     46         getnchannels()  -- returns number of audio channels (1 for
     47                            mono, 2 for stereo)
     48         getsampwidth()  -- returns sample width in bytes
     49         getframerate()  -- returns sampling frequency
     50         getnframes()    -- returns number of audio frames
     51         getcomptype()   -- returns compression type ('NONE' or 'ULAW')
     52         getcompname()   -- returns human-readable version of
     53                            compression type ('not compressed' matches 'NONE')
     54         getparams()     -- returns a namedtuple consisting of all of the
     55                            above in the above order
     56         getmarkers()    -- returns None (for compatibility with the
     57                            aifc module)
     58         getmark(id)     -- raises an error since the mark does not
     59                            exist (for compatibility with the aifc module)
     60         readframes(n)   -- returns at most n frames of audio
     61         rewind()        -- rewind to the beginning of the audio stream
     62         setpos(pos)     -- seek to the specified position
     63         tell()          -- return the current position
     64         close()         -- close the instance (make it unusable)
     65 The position returned by tell() and the position given to setpos()
     66 are compatible and have nothing to do with the actual position in the
     67 file.
     68 The close() method is called automatically when the class instance
     69 is destroyed.
     70 
     71 Writing audio files:
     72         f = sunau.open(file, 'w')
     73 where file is either the name of a file or an open file pointer.
     74 The open file pointer must have methods write(), tell(), seek(), and
     75 close().
     76 
     77 This returns an instance of a class with the following public methods:
     78         setnchannels(n) -- set the number of channels
     79         setsampwidth(n) -- set the sample width
     80         setframerate(n) -- set the frame rate
     81         setnframes(n)   -- set the number of frames
     82         setcomptype(type, name)
     83                         -- set the compression type and the
     84                            human-readable compression type
     85         setparams(tuple)-- set all parameters at once
     86         tell()          -- return current position in output file
     87         writeframesraw(data)
     88                         -- write audio frames without pathing up the
     89                            file header
     90         writeframes(data)
     91                         -- write audio frames and patch up the file header
     92         close()         -- patch up the file header and close the
     93                            output file
     94 You should set the parameters before the first writeframesraw or
     95 writeframes.  The total number of frames does not need to be set,
     96 but when it is set to the correct value, the header does not have to
     97 be patched up.
     98 It is best to first set all parameters, perhaps possibly the
     99 compression type, and then write audio frames using writeframesraw.
    100 When all frames have been written, either call writeframes(b'') or
    101 close() to patch up the sizes in the header.
    102 The close() method is called automatically when the class instance
    103 is destroyed.
    104 """
    105 
    106 from collections import namedtuple
    107 
    108 _sunau_params = namedtuple('_sunau_params',
    109                            'nchannels sampwidth framerate nframes comptype compname')
    110 
    111 # from <multimedia/audio_filehdr.h>
    112 AUDIO_FILE_MAGIC = 0x2e736e64
    113 AUDIO_FILE_ENCODING_MULAW_8 = 1
    114 AUDIO_FILE_ENCODING_LINEAR_8 = 2
    115 AUDIO_FILE_ENCODING_LINEAR_16 = 3
    116 AUDIO_FILE_ENCODING_LINEAR_24 = 4
    117 AUDIO_FILE_ENCODING_LINEAR_32 = 5
    118 AUDIO_FILE_ENCODING_FLOAT = 6
    119 AUDIO_FILE_ENCODING_DOUBLE = 7
    120 AUDIO_FILE_ENCODING_ADPCM_G721 = 23
    121 AUDIO_FILE_ENCODING_ADPCM_G722 = 24
    122 AUDIO_FILE_ENCODING_ADPCM_G723_3 = 25
    123 AUDIO_FILE_ENCODING_ADPCM_G723_5 = 26
    124 AUDIO_FILE_ENCODING_ALAW_8 = 27
    125 
    126 # from <multimedia/audio_hdr.h>
    127 AUDIO_UNKNOWN_SIZE = 0xFFFFFFFF        # ((unsigned)(~0))
    128 
    129 _simple_encodings = [AUDIO_FILE_ENCODING_MULAW_8,
    130                      AUDIO_FILE_ENCODING_LINEAR_8,
    131                      AUDIO_FILE_ENCODING_LINEAR_16,
    132                      AUDIO_FILE_ENCODING_LINEAR_24,
    133                      AUDIO_FILE_ENCODING_LINEAR_32,
    134                      AUDIO_FILE_ENCODING_ALAW_8]
    135 
    136 class Error(Exception):
    137     pass
    138 
    139 def _read_u32(file):
    140     x = 0
    141     for i in range(4):
    142         byte = file.read(1)
    143         if not byte:
    144             raise EOFError
    145         x = x*256 + ord(byte)
    146     return x
    147 
    148 def _write_u32(file, x):
    149     data = []
    150     for i in range(4):
    151         d, m = divmod(x, 256)
    152         data.insert(0, int(m))
    153         x = d
    154     file.write(bytes(data))
    155 
    156 class Au_read:
    157 
    158     def __init__(self, f):
    159         if type(f) == type(''):
    160             import builtins
    161             f = builtins.open(f, 'rb')
    162             self._opened = True
    163         else:
    164             self._opened = False
    165         self.initfp(f)
    166 
    167     def __del__(self):
    168         if self._file:
    169             self.close()
    170 
    171     def __enter__(self):
    172         return self
    173 
    174     def __exit__(self, *args):
    175         self.close()
    176 
    177     def initfp(self, file):
    178         self._file = file
    179         self._soundpos = 0
    180         magic = int(_read_u32(file))
    181         if magic != AUDIO_FILE_MAGIC:
    182             raise Error('bad magic number')
    183         self._hdr_size = int(_read_u32(file))
    184         if self._hdr_size < 24:
    185             raise Error('header size too small')
    186         if self._hdr_size > 100:
    187             raise Error('header size ridiculously large')
    188         self._data_size = _read_u32(file)
    189         if self._data_size != AUDIO_UNKNOWN_SIZE:
    190             self._data_size = int(self._data_size)
    191         self._encoding = int(_read_u32(file))
    192         if self._encoding not in _simple_encodings:
    193             raise Error('encoding not (yet) supported')
    194         if self._encoding in (AUDIO_FILE_ENCODING_MULAW_8,
    195                   AUDIO_FILE_ENCODING_ALAW_8):
    196             self._sampwidth = 2
    197             self._framesize = 1
    198         elif self._encoding == AUDIO_FILE_ENCODING_LINEAR_8:
    199             self._framesize = self._sampwidth = 1
    200         elif self._encoding == AUDIO_FILE_ENCODING_LINEAR_16:
    201             self._framesize = self._sampwidth = 2
    202         elif self._encoding == AUDIO_FILE_ENCODING_LINEAR_24:
    203             self._framesize = self._sampwidth = 3
    204         elif self._encoding == AUDIO_FILE_ENCODING_LINEAR_32:
    205             self._framesize = self._sampwidth = 4
    206         else:
    207             raise Error('unknown encoding')
    208         self._framerate = int(_read_u32(file))
    209         self._nchannels = int(_read_u32(file))
    210         self._framesize = self._framesize * self._nchannels
    211         if self._hdr_size > 24:
    212             self._info = file.read(self._hdr_size - 24)
    213             self._info, _, _ = self._info.partition(b'\0')
    214         else:
    215             self._info = b''
    216         try:
    217             self._data_pos = file.tell()
    218         except (AttributeError, OSError):
    219             self._data_pos = None
    220 
    221     def getfp(self):
    222         return self._file
    223 
    224     def getnchannels(self):
    225         return self._nchannels
    226 
    227     def getsampwidth(self):
    228         return self._sampwidth
    229 
    230     def getframerate(self):
    231         return self._framerate
    232 
    233     def getnframes(self):
    234         if self._data_size == AUDIO_UNKNOWN_SIZE:
    235             return AUDIO_UNKNOWN_SIZE
    236         if self._encoding in _simple_encodings:
    237             return self._data_size // self._framesize
    238         return 0                # XXX--must do some arithmetic here
    239 
    240     def getcomptype(self):
    241         if self._encoding == AUDIO_FILE_ENCODING_MULAW_8:
    242             return 'ULAW'
    243         elif self._encoding == AUDIO_FILE_ENCODING_ALAW_8:
    244             return 'ALAW'
    245         else:
    246             return 'NONE'
    247 
    248     def getcompname(self):
    249         if self._encoding == AUDIO_FILE_ENCODING_MULAW_8:
    250             return 'CCITT G.711 u-law'
    251         elif self._encoding == AUDIO_FILE_ENCODING_ALAW_8:
    252             return 'CCITT G.711 A-law'
    253         else:
    254             return 'not compressed'
    255 
    256     def getparams(self):
    257         return _sunau_params(self.getnchannels(), self.getsampwidth(),
    258                   self.getframerate(), self.getnframes(),
    259                   self.getcomptype(), self.getcompname())
    260 
    261     def getmarkers(self):
    262         return None
    263 
    264     def getmark(self, id):
    265         raise Error('no marks')
    266 
    267     def readframes(self, nframes):
    268         if self._encoding in _simple_encodings:
    269             if nframes == AUDIO_UNKNOWN_SIZE:
    270                 data = self._file.read()
    271             else:
    272                 data = self._file.read(nframes * self._framesize)
    273             self._soundpos += len(data) // self._framesize
    274             if self._encoding == AUDIO_FILE_ENCODING_MULAW_8:
    275                 import audioop
    276                 data = audioop.ulaw2lin(data, self._sampwidth)
    277             return data
    278         return None             # XXX--not implemented yet
    279 
    280     def rewind(self):
    281         if self._data_pos is None:
    282             raise OSError('cannot seek')
    283         self._file.seek(self._data_pos)
    284         self._soundpos = 0
    285 
    286     def tell(self):
    287         return self._soundpos
    288 
    289     def setpos(self, pos):
    290         if pos < 0 or pos > self.getnframes():
    291             raise Error('position not in range')
    292         if self._data_pos is None:
    293             raise OSError('cannot seek')
    294         self._file.seek(self._data_pos + pos * self._framesize)
    295         self._soundpos = pos
    296 
    297     def close(self):
    298         file = self._file
    299         if file:
    300             self._file = None
    301             if self._opened:
    302                 file.close()
    303 
    304 class Au_write:
    305 
    306     def __init__(self, f):
    307         if type(f) == type(''):
    308             import builtins
    309             f = builtins.open(f, 'wb')
    310             self._opened = True
    311         else:
    312             self._opened = False
    313         self.initfp(f)
    314 
    315     def __del__(self):
    316         if self._file:
    317             self.close()
    318         self._file = None
    319 
    320     def __enter__(self):
    321         return self
    322 
    323     def __exit__(self, *args):
    324         self.close()
    325 
    326     def initfp(self, file):
    327         self._file = file
    328         self._framerate = 0
    329         self._nchannels = 0
    330         self._sampwidth = 0
    331         self._framesize = 0
    332         self._nframes = AUDIO_UNKNOWN_SIZE
    333         self._nframeswritten = 0
    334         self._datawritten = 0
    335         self._datalength = 0
    336         self._info = b''
    337         self._comptype = 'ULAW' # default is U-law
    338 
    339     def setnchannels(self, nchannels):
    340         if self._nframeswritten:
    341             raise Error('cannot change parameters after starting to write')
    342         if nchannels not in (1, 2, 4):
    343             raise Error('only 1, 2, or 4 channels supported')
    344         self._nchannels = nchannels
    345 
    346     def getnchannels(self):
    347         if not self._nchannels:
    348             raise Error('number of channels not set')
    349         return self._nchannels
    350 
    351     def setsampwidth(self, sampwidth):
    352         if self._nframeswritten:
    353             raise Error('cannot change parameters after starting to write')
    354         if sampwidth not in (1, 2, 3, 4):
    355             raise Error('bad sample width')
    356         self._sampwidth = sampwidth
    357 
    358     def getsampwidth(self):
    359         if not self._framerate:
    360             raise Error('sample width not specified')
    361         return self._sampwidth
    362 
    363     def setframerate(self, framerate):
    364         if self._nframeswritten:
    365             raise Error('cannot change parameters after starting to write')
    366         self._framerate = framerate
    367 
    368     def getframerate(self):
    369         if not self._framerate:
    370             raise Error('frame rate not set')
    371         return self._framerate
    372 
    373     def setnframes(self, nframes):
    374         if self._nframeswritten:
    375             raise Error('cannot change parameters after starting to write')
    376         if nframes < 0:
    377             raise Error('# of frames cannot be negative')
    378         self._nframes = nframes
    379 
    380     def getnframes(self):
    381         return self._nframeswritten
    382 
    383     def setcomptype(self, type, name):
    384         if type in ('NONE', 'ULAW'):
    385             self._comptype = type
    386         else:
    387             raise Error('unknown compression type')
    388 
    389     def getcomptype(self):
    390         return self._comptype
    391 
    392     def getcompname(self):
    393         if self._comptype == 'ULAW':
    394             return 'CCITT G.711 u-law'
    395         elif self._comptype == 'ALAW':
    396             return 'CCITT G.711 A-law'
    397         else:
    398             return 'not compressed'
    399 
    400     def setparams(self, params):
    401         nchannels, sampwidth, framerate, nframes, comptype, compname = params
    402         self.setnchannels(nchannels)
    403         self.setsampwidth(sampwidth)
    404         self.setframerate(framerate)
    405         self.setnframes(nframes)
    406         self.setcomptype(comptype, compname)
    407 
    408     def getparams(self):
    409         return _sunau_params(self.getnchannels(), self.getsampwidth(),
    410                   self.getframerate(), self.getnframes(),
    411                   self.getcomptype(), self.getcompname())
    412 
    413     def tell(self):
    414         return self._nframeswritten
    415 
    416     def writeframesraw(self, data):
    417         if not isinstance(data, (bytes, bytearray)):
    418             data = memoryview(data).cast('B')
    419         self._ensure_header_written()
    420         if self._comptype == 'ULAW':
    421             import audioop
    422             data = audioop.lin2ulaw(data, self._sampwidth)
    423         nframes = len(data) // self._framesize
    424         self._file.write(data)
    425         self._nframeswritten = self._nframeswritten + nframes
    426         self._datawritten = self._datawritten + len(data)
    427 
    428     def writeframes(self, data):
    429         self.writeframesraw(data)
    430         if self._nframeswritten != self._nframes or \
    431                   self._datalength != self._datawritten:
    432             self._patchheader()
    433 
    434     def close(self):
    435         if self._file:
    436             try:
    437                 self._ensure_header_written()
    438                 if self._nframeswritten != self._nframes or \
    439                         self._datalength != self._datawritten:
    440                     self._patchheader()
    441                 self._file.flush()
    442             finally:
    443                 file = self._file
    444                 self._file = None
    445                 if self._opened:
    446                     file.close()
    447 
    448     #
    449     # private methods
    450     #
    451 
    452     def _ensure_header_written(self):
    453         if not self._nframeswritten:
    454             if not self._nchannels:
    455                 raise Error('# of channels not specified')
    456             if not self._sampwidth:
    457                 raise Error('sample width not specified')
    458             if not self._framerate:
    459                 raise Error('frame rate not specified')
    460             self._write_header()
    461 
    462     def _write_header(self):
    463         if self._comptype == 'NONE':
    464             if self._sampwidth == 1:
    465                 encoding = AUDIO_FILE_ENCODING_LINEAR_8
    466                 self._framesize = 1
    467             elif self._sampwidth == 2:
    468                 encoding = AUDIO_FILE_ENCODING_LINEAR_16
    469                 self._framesize = 2
    470             elif self._sampwidth == 3:
    471                 encoding = AUDIO_FILE_ENCODING_LINEAR_24
    472                 self._framesize = 3
    473             elif self._sampwidth == 4:
    474                 encoding = AUDIO_FILE_ENCODING_LINEAR_32
    475                 self._framesize = 4
    476             else:
    477                 raise Error('internal error')
    478         elif self._comptype == 'ULAW':
    479             encoding = AUDIO_FILE_ENCODING_MULAW_8
    480             self._framesize = 1
    481         else:
    482             raise Error('internal error')
    483         self._framesize = self._framesize * self._nchannels
    484         _write_u32(self._file, AUDIO_FILE_MAGIC)
    485         header_size = 25 + len(self._info)
    486         header_size = (header_size + 7) & ~7
    487         _write_u32(self._file, header_size)
    488         if self._nframes == AUDIO_UNKNOWN_SIZE:
    489             length = AUDIO_UNKNOWN_SIZE
    490         else:
    491             length = self._nframes * self._framesize
    492         try:
    493             self._form_length_pos = self._file.tell()
    494         except (AttributeError, OSError):
    495             self._form_length_pos = None
    496         _write_u32(self._file, length)
    497         self._datalength = length
    498         _write_u32(self._file, encoding)
    499         _write_u32(self._file, self._framerate)
    500         _write_u32(self._file, self._nchannels)
    501         self._file.write(self._info)
    502         self._file.write(b'\0'*(header_size - len(self._info) - 24))
    503 
    504     def _patchheader(self):
    505         if self._form_length_pos is None:
    506             raise OSError('cannot seek')
    507         self._file.seek(self._form_length_pos)
    508         _write_u32(self._file, self._datawritten)
    509         self._datalength = self._datawritten
    510         self._file.seek(0, 2)
    511 
    512 def open(f, mode=None):
    513     if mode is None:
    514         if hasattr(f, 'mode'):
    515             mode = f.mode
    516         else:
    517             mode = 'rb'
    518     if mode in ('r', 'rb'):
    519         return Au_read(f)
    520     elif mode in ('w', 'wb'):
    521         return Au_write(f)
    522     else:
    523         raise Error("mode must be 'r', 'rb', 'w', or 'wb'")
    524 
    525 openfp = open
    526