1 """Stuff to parse Sun and NeXT audio files. 2 3 An audio file consists of a header followed by the data. The structure 4 of the header is as follows. 5 6 +---------------+ 7 | magic word | 8 +---------------+ 9 | header size | 10 +---------------+ 11 | data size | 12 +---------------+ 13 | encoding | 14 +---------------+ 15 | sample rate | 16 +---------------+ 17 | # of channels | 18 +---------------+ 19 | info | 20 | | 21 +---------------+ 22 23 The magic word consists of the 4 characters '.snd'. Apart from the 24 info field, all header fields are 4 bytes in size. They are all 25 32-bit unsigned integers encoded in big-endian byte order. 26 27 The header size really gives the start of the data. 28 The data size is the physical size of the data. From the other 29 parameters the number of frames can be calculated. 30 The encoding gives the way in which audio samples are encoded. 31 Possible values are listed below. 32 The info field currently consists of an ASCII string giving a 33 human-readable description of the audio file. The info field is 34 padded with NUL bytes to the header size. 35 36 Usage. 37 38 Reading audio files: 39 f = sunau.open(file, 'r') 40 where file is either the name of a file or an open file pointer. 41 The open file pointer must have methods read(), seek(), and close(). 42 When the setpos() and rewind() methods are not used, the seek() 43 method is not necessary. 44 45 This returns an instance of a class with the following public methods: 46 getnchannels() -- returns number of audio channels (1 for 47 mono, 2 for stereo) 48 getsampwidth() -- returns sample width in bytes 49 getframerate() -- returns sampling frequency 50 getnframes() -- returns number of audio frames 51 getcomptype() -- returns compression type ('NONE' or 'ULAW') 52 getcompname() -- returns human-readable version of 53 compression type ('not compressed' matches 'NONE') 54 getparams() -- returns a namedtuple consisting of all of the 55 above in the above order 56 getmarkers() -- returns None (for compatibility with the 57 aifc module) 58 getmark(id) -- raises an error since the mark does not 59 exist (for compatibility with the aifc module) 60 readframes(n) -- returns at most n frames of audio 61 rewind() -- rewind to the beginning of the audio stream 62 setpos(pos) -- seek to the specified position 63 tell() -- return the current position 64 close() -- close the instance (make it unusable) 65 The position returned by tell() and the position given to setpos() 66 are compatible and have nothing to do with the actual position in the 67 file. 68 The close() method is called automatically when the class instance 69 is destroyed. 70 71 Writing audio files: 72 f = sunau.open(file, 'w') 73 where file is either the name of a file or an open file pointer. 74 The open file pointer must have methods write(), tell(), seek(), and 75 close(). 76 77 This returns an instance of a class with the following public methods: 78 setnchannels(n) -- set the number of channels 79 setsampwidth(n) -- set the sample width 80 setframerate(n) -- set the frame rate 81 setnframes(n) -- set the number of frames 82 setcomptype(type, name) 83 -- set the compression type and the 84 human-readable compression type 85 setparams(tuple)-- set all parameters at once 86 tell() -- return current position in output file 87 writeframesraw(data) 88 -- write audio frames without pathing up the 89 file header 90 writeframes(data) 91 -- write audio frames and patch up the file header 92 close() -- patch up the file header and close the 93 output file 94 You should set the parameters before the first writeframesraw or 95 writeframes. The total number of frames does not need to be set, 96 but when it is set to the correct value, the header does not have to 97 be patched up. 98 It is best to first set all parameters, perhaps possibly the 99 compression type, and then write audio frames using writeframesraw. 100 When all frames have been written, either call writeframes(b'') or 101 close() to patch up the sizes in the header. 102 The close() method is called automatically when the class instance 103 is destroyed. 104 """ 105 106 from collections import namedtuple 107 108 _sunau_params = namedtuple('_sunau_params', 109 'nchannels sampwidth framerate nframes comptype compname') 110 111 # from <multimedia/audio_filehdr.h> 112 AUDIO_FILE_MAGIC = 0x2e736e64 113 AUDIO_FILE_ENCODING_MULAW_8 = 1 114 AUDIO_FILE_ENCODING_LINEAR_8 = 2 115 AUDIO_FILE_ENCODING_LINEAR_16 = 3 116 AUDIO_FILE_ENCODING_LINEAR_24 = 4 117 AUDIO_FILE_ENCODING_LINEAR_32 = 5 118 AUDIO_FILE_ENCODING_FLOAT = 6 119 AUDIO_FILE_ENCODING_DOUBLE = 7 120 AUDIO_FILE_ENCODING_ADPCM_G721 = 23 121 AUDIO_FILE_ENCODING_ADPCM_G722 = 24 122 AUDIO_FILE_ENCODING_ADPCM_G723_3 = 25 123 AUDIO_FILE_ENCODING_ADPCM_G723_5 = 26 124 AUDIO_FILE_ENCODING_ALAW_8 = 27 125 126 # from <multimedia/audio_hdr.h> 127 AUDIO_UNKNOWN_SIZE = 0xFFFFFFFF # ((unsigned)(~0)) 128 129 _simple_encodings = [AUDIO_FILE_ENCODING_MULAW_8, 130 AUDIO_FILE_ENCODING_LINEAR_8, 131 AUDIO_FILE_ENCODING_LINEAR_16, 132 AUDIO_FILE_ENCODING_LINEAR_24, 133 AUDIO_FILE_ENCODING_LINEAR_32, 134 AUDIO_FILE_ENCODING_ALAW_8] 135 136 class Error(Exception): 137 pass 138 139 def _read_u32(file): 140 x = 0 141 for i in range(4): 142 byte = file.read(1) 143 if not byte: 144 raise EOFError 145 x = x*256 + ord(byte) 146 return x 147 148 def _write_u32(file, x): 149 data = [] 150 for i in range(4): 151 d, m = divmod(x, 256) 152 data.insert(0, int(m)) 153 x = d 154 file.write(bytes(data)) 155 156 class Au_read: 157 158 def __init__(self, f): 159 if type(f) == type(''): 160 import builtins 161 f = builtins.open(f, 'rb') 162 self._opened = True 163 else: 164 self._opened = False 165 self.initfp(f) 166 167 def __del__(self): 168 if self._file: 169 self.close() 170 171 def __enter__(self): 172 return self 173 174 def __exit__(self, *args): 175 self.close() 176 177 def initfp(self, file): 178 self._file = file 179 self._soundpos = 0 180 magic = int(_read_u32(file)) 181 if magic != AUDIO_FILE_MAGIC: 182 raise Error('bad magic number') 183 self._hdr_size = int(_read_u32(file)) 184 if self._hdr_size < 24: 185 raise Error('header size too small') 186 if self._hdr_size > 100: 187 raise Error('header size ridiculously large') 188 self._data_size = _read_u32(file) 189 if self._data_size != AUDIO_UNKNOWN_SIZE: 190 self._data_size = int(self._data_size) 191 self._encoding = int(_read_u32(file)) 192 if self._encoding not in _simple_encodings: 193 raise Error('encoding not (yet) supported') 194 if self._encoding in (AUDIO_FILE_ENCODING_MULAW_8, 195 AUDIO_FILE_ENCODING_ALAW_8): 196 self._sampwidth = 2 197 self._framesize = 1 198 elif self._encoding == AUDIO_FILE_ENCODING_LINEAR_8: 199 self._framesize = self._sampwidth = 1 200 elif self._encoding == AUDIO_FILE_ENCODING_LINEAR_16: 201 self._framesize = self._sampwidth = 2 202 elif self._encoding == AUDIO_FILE_ENCODING_LINEAR_24: 203 self._framesize = self._sampwidth = 3 204 elif self._encoding == AUDIO_FILE_ENCODING_LINEAR_32: 205 self._framesize = self._sampwidth = 4 206 else: 207 raise Error('unknown encoding') 208 self._framerate = int(_read_u32(file)) 209 self._nchannels = int(_read_u32(file)) 210 self._framesize = self._framesize * self._nchannels 211 if self._hdr_size > 24: 212 self._info = file.read(self._hdr_size - 24) 213 self._info, _, _ = self._info.partition(b'\0') 214 else: 215 self._info = b'' 216 try: 217 self._data_pos = file.tell() 218 except (AttributeError, OSError): 219 self._data_pos = None 220 221 def getfp(self): 222 return self._file 223 224 def getnchannels(self): 225 return self._nchannels 226 227 def getsampwidth(self): 228 return self._sampwidth 229 230 def getframerate(self): 231 return self._framerate 232 233 def getnframes(self): 234 if self._data_size == AUDIO_UNKNOWN_SIZE: 235 return AUDIO_UNKNOWN_SIZE 236 if self._encoding in _simple_encodings: 237 return self._data_size // self._framesize 238 return 0 # XXX--must do some arithmetic here 239 240 def getcomptype(self): 241 if self._encoding == AUDIO_FILE_ENCODING_MULAW_8: 242 return 'ULAW' 243 elif self._encoding == AUDIO_FILE_ENCODING_ALAW_8: 244 return 'ALAW' 245 else: 246 return 'NONE' 247 248 def getcompname(self): 249 if self._encoding == AUDIO_FILE_ENCODING_MULAW_8: 250 return 'CCITT G.711 u-law' 251 elif self._encoding == AUDIO_FILE_ENCODING_ALAW_8: 252 return 'CCITT G.711 A-law' 253 else: 254 return 'not compressed' 255 256 def getparams(self): 257 return _sunau_params(self.getnchannels(), self.getsampwidth(), 258 self.getframerate(), self.getnframes(), 259 self.getcomptype(), self.getcompname()) 260 261 def getmarkers(self): 262 return None 263 264 def getmark(self, id): 265 raise Error('no marks') 266 267 def readframes(self, nframes): 268 if self._encoding in _simple_encodings: 269 if nframes == AUDIO_UNKNOWN_SIZE: 270 data = self._file.read() 271 else: 272 data = self._file.read(nframes * self._framesize) 273 self._soundpos += len(data) // self._framesize 274 if self._encoding == AUDIO_FILE_ENCODING_MULAW_8: 275 import audioop 276 data = audioop.ulaw2lin(data, self._sampwidth) 277 return data 278 return None # XXX--not implemented yet 279 280 def rewind(self): 281 if self._data_pos is None: 282 raise OSError('cannot seek') 283 self._file.seek(self._data_pos) 284 self._soundpos = 0 285 286 def tell(self): 287 return self._soundpos 288 289 def setpos(self, pos): 290 if pos < 0 or pos > self.getnframes(): 291 raise Error('position not in range') 292 if self._data_pos is None: 293 raise OSError('cannot seek') 294 self._file.seek(self._data_pos + pos * self._framesize) 295 self._soundpos = pos 296 297 def close(self): 298 file = self._file 299 if file: 300 self._file = None 301 if self._opened: 302 file.close() 303 304 class Au_write: 305 306 def __init__(self, f): 307 if type(f) == type(''): 308 import builtins 309 f = builtins.open(f, 'wb') 310 self._opened = True 311 else: 312 self._opened = False 313 self.initfp(f) 314 315 def __del__(self): 316 if self._file: 317 self.close() 318 self._file = None 319 320 def __enter__(self): 321 return self 322 323 def __exit__(self, *args): 324 self.close() 325 326 def initfp(self, file): 327 self._file = file 328 self._framerate = 0 329 self._nchannels = 0 330 self._sampwidth = 0 331 self._framesize = 0 332 self._nframes = AUDIO_UNKNOWN_SIZE 333 self._nframeswritten = 0 334 self._datawritten = 0 335 self._datalength = 0 336 self._info = b'' 337 self._comptype = 'ULAW' # default is U-law 338 339 def setnchannels(self, nchannels): 340 if self._nframeswritten: 341 raise Error('cannot change parameters after starting to write') 342 if nchannels not in (1, 2, 4): 343 raise Error('only 1, 2, or 4 channels supported') 344 self._nchannels = nchannels 345 346 def getnchannels(self): 347 if not self._nchannels: 348 raise Error('number of channels not set') 349 return self._nchannels 350 351 def setsampwidth(self, sampwidth): 352 if self._nframeswritten: 353 raise Error('cannot change parameters after starting to write') 354 if sampwidth not in (1, 2, 3, 4): 355 raise Error('bad sample width') 356 self._sampwidth = sampwidth 357 358 def getsampwidth(self): 359 if not self._framerate: 360 raise Error('sample width not specified') 361 return self._sampwidth 362 363 def setframerate(self, framerate): 364 if self._nframeswritten: 365 raise Error('cannot change parameters after starting to write') 366 self._framerate = framerate 367 368 def getframerate(self): 369 if not self._framerate: 370 raise Error('frame rate not set') 371 return self._framerate 372 373 def setnframes(self, nframes): 374 if self._nframeswritten: 375 raise Error('cannot change parameters after starting to write') 376 if nframes < 0: 377 raise Error('# of frames cannot be negative') 378 self._nframes = nframes 379 380 def getnframes(self): 381 return self._nframeswritten 382 383 def setcomptype(self, type, name): 384 if type in ('NONE', 'ULAW'): 385 self._comptype = type 386 else: 387 raise Error('unknown compression type') 388 389 def getcomptype(self): 390 return self._comptype 391 392 def getcompname(self): 393 if self._comptype == 'ULAW': 394 return 'CCITT G.711 u-law' 395 elif self._comptype == 'ALAW': 396 return 'CCITT G.711 A-law' 397 else: 398 return 'not compressed' 399 400 def setparams(self, params): 401 nchannels, sampwidth, framerate, nframes, comptype, compname = params 402 self.setnchannels(nchannels) 403 self.setsampwidth(sampwidth) 404 self.setframerate(framerate) 405 self.setnframes(nframes) 406 self.setcomptype(comptype, compname) 407 408 def getparams(self): 409 return _sunau_params(self.getnchannels(), self.getsampwidth(), 410 self.getframerate(), self.getnframes(), 411 self.getcomptype(), self.getcompname()) 412 413 def tell(self): 414 return self._nframeswritten 415 416 def writeframesraw(self, data): 417 if not isinstance(data, (bytes, bytearray)): 418 data = memoryview(data).cast('B') 419 self._ensure_header_written() 420 if self._comptype == 'ULAW': 421 import audioop 422 data = audioop.lin2ulaw(data, self._sampwidth) 423 nframes = len(data) // self._framesize 424 self._file.write(data) 425 self._nframeswritten = self._nframeswritten + nframes 426 self._datawritten = self._datawritten + len(data) 427 428 def writeframes(self, data): 429 self.writeframesraw(data) 430 if self._nframeswritten != self._nframes or \ 431 self._datalength != self._datawritten: 432 self._patchheader() 433 434 def close(self): 435 if self._file: 436 try: 437 self._ensure_header_written() 438 if self._nframeswritten != self._nframes or \ 439 self._datalength != self._datawritten: 440 self._patchheader() 441 self._file.flush() 442 finally: 443 file = self._file 444 self._file = None 445 if self._opened: 446 file.close() 447 448 # 449 # private methods 450 # 451 452 def _ensure_header_written(self): 453 if not self._nframeswritten: 454 if not self._nchannels: 455 raise Error('# of channels not specified') 456 if not self._sampwidth: 457 raise Error('sample width not specified') 458 if not self._framerate: 459 raise Error('frame rate not specified') 460 self._write_header() 461 462 def _write_header(self): 463 if self._comptype == 'NONE': 464 if self._sampwidth == 1: 465 encoding = AUDIO_FILE_ENCODING_LINEAR_8 466 self._framesize = 1 467 elif self._sampwidth == 2: 468 encoding = AUDIO_FILE_ENCODING_LINEAR_16 469 self._framesize = 2 470 elif self._sampwidth == 3: 471 encoding = AUDIO_FILE_ENCODING_LINEAR_24 472 self._framesize = 3 473 elif self._sampwidth == 4: 474 encoding = AUDIO_FILE_ENCODING_LINEAR_32 475 self._framesize = 4 476 else: 477 raise Error('internal error') 478 elif self._comptype == 'ULAW': 479 encoding = AUDIO_FILE_ENCODING_MULAW_8 480 self._framesize = 1 481 else: 482 raise Error('internal error') 483 self._framesize = self._framesize * self._nchannels 484 _write_u32(self._file, AUDIO_FILE_MAGIC) 485 header_size = 25 + len(self._info) 486 header_size = (header_size + 7) & ~7 487 _write_u32(self._file, header_size) 488 if self._nframes == AUDIO_UNKNOWN_SIZE: 489 length = AUDIO_UNKNOWN_SIZE 490 else: 491 length = self._nframes * self._framesize 492 try: 493 self._form_length_pos = self._file.tell() 494 except (AttributeError, OSError): 495 self._form_length_pos = None 496 _write_u32(self._file, length) 497 self._datalength = length 498 _write_u32(self._file, encoding) 499 _write_u32(self._file, self._framerate) 500 _write_u32(self._file, self._nchannels) 501 self._file.write(self._info) 502 self._file.write(b'\0'*(header_size - len(self._info) - 24)) 503 504 def _patchheader(self): 505 if self._form_length_pos is None: 506 raise OSError('cannot seek') 507 self._file.seek(self._form_length_pos) 508 _write_u32(self._file, self._datawritten) 509 self._datalength = self._datawritten 510 self._file.seek(0, 2) 511 512 def open(f, mode=None): 513 if mode is None: 514 if hasattr(f, 'mode'): 515 mode = f.mode 516 else: 517 mode = 'rb' 518 if mode in ('r', 'rb'): 519 return Au_read(f) 520 elif mode in ('w', 'wb'): 521 return Au_write(f) 522 else: 523 raise Error("mode must be 'r', 'rb', 'w', or 'wb'") 524 525 openfp = open 526