1 from test.support import (TESTFN, run_unittest, import_module, unlink, 2 requires, _2G, _4G, gc_collect, cpython_only) 3 import unittest 4 import os 5 import re 6 import itertools 7 import socket 8 import sys 9 import weakref 10 11 # Skip test if we can't import mmap. 12 mmap = import_module('mmap') 13 14 PAGESIZE = mmap.PAGESIZE 15 16 class MmapTests(unittest.TestCase): 17 18 def setUp(self): 19 if os.path.exists(TESTFN): 20 os.unlink(TESTFN) 21 22 def tearDown(self): 23 try: 24 os.unlink(TESTFN) 25 except OSError: 26 pass 27 28 def test_basic(self): 29 # Test mmap module on Unix systems and Windows 30 31 # Create a file to be mmap'ed. 32 f = open(TESTFN, 'bw+') 33 try: 34 # Write 2 pages worth of data to the file 35 f.write(b'\0'* PAGESIZE) 36 f.write(b'foo') 37 f.write(b'\0'* (PAGESIZE-3) ) 38 f.flush() 39 m = mmap.mmap(f.fileno(), 2 * PAGESIZE) 40 finally: 41 f.close() 42 43 # Simple sanity checks 44 45 tp = str(type(m)) # SF bug 128713: segfaulted on Linux 46 self.assertEqual(m.find(b'foo'), PAGESIZE) 47 48 self.assertEqual(len(m), 2*PAGESIZE) 49 50 self.assertEqual(m[0], 0) 51 self.assertEqual(m[0:3], b'\0\0\0') 52 53 # Shouldn't crash on boundary (Issue #5292) 54 self.assertRaises(IndexError, m.__getitem__, len(m)) 55 self.assertRaises(IndexError, m.__setitem__, len(m), b'\0') 56 57 # Modify the file's content 58 m[0] = b'3'[0] 59 m[PAGESIZE +3: PAGESIZE +3+3] = b'bar' 60 61 # Check that the modification worked 62 self.assertEqual(m[0], b'3'[0]) 63 self.assertEqual(m[0:3], b'3\0\0') 64 self.assertEqual(m[PAGESIZE-1 : PAGESIZE + 7], b'\0foobar\0') 65 66 m.flush() 67 68 # Test doing a regular expression match in an mmap'ed file 69 match = re.search(b'[A-Za-z]+', m) 70 if match is None: 71 self.fail('regex match on mmap failed!') 72 else: 73 start, end = match.span(0) 74 length = end - start 75 76 self.assertEqual(start, PAGESIZE) 77 self.assertEqual(end, PAGESIZE + 6) 78 79 # test seeking around (try to overflow the seek implementation) 80 m.seek(0,0) 81 self.assertEqual(m.tell(), 0) 82 m.seek(42,1) 83 self.assertEqual(m.tell(), 42) 84 m.seek(0,2) 85 self.assertEqual(m.tell(), len(m)) 86 87 # Try to seek to negative position... 88 self.assertRaises(ValueError, m.seek, -1) 89 90 # Try to seek beyond end of mmap... 91 self.assertRaises(ValueError, m.seek, 1, 2) 92 93 # Try to seek to negative position... 94 self.assertRaises(ValueError, m.seek, -len(m)-1, 2) 95 96 # Try resizing map 97 try: 98 m.resize(512) 99 except SystemError: 100 # resize() not supported 101 # No messages are printed, since the output of this test suite 102 # would then be different across platforms. 103 pass 104 else: 105 # resize() is supported 106 self.assertEqual(len(m), 512) 107 # Check that we can no longer seek beyond the new size. 108 self.assertRaises(ValueError, m.seek, 513, 0) 109 110 # Check that the underlying file is truncated too 111 # (bug #728515) 112 f = open(TESTFN, 'rb') 113 try: 114 f.seek(0, 2) 115 self.assertEqual(f.tell(), 512) 116 finally: 117 f.close() 118 self.assertEqual(m.size(), 512) 119 120 m.close() 121 122 def test_access_parameter(self): 123 # Test for "access" keyword parameter 124 mapsize = 10 125 with open(TESTFN, "wb") as fp: 126 fp.write(b"a"*mapsize) 127 with open(TESTFN, "rb") as f: 128 m = mmap.mmap(f.fileno(), mapsize, access=mmap.ACCESS_READ) 129 self.assertEqual(m[:], b'a'*mapsize, "Readonly memory map data incorrect.") 130 131 # Ensuring that readonly mmap can't be slice assigned 132 try: 133 m[:] = b'b'*mapsize 134 except TypeError: 135 pass 136 else: 137 self.fail("Able to write to readonly memory map") 138 139 # Ensuring that readonly mmap can't be item assigned 140 try: 141 m[0] = b'b' 142 except TypeError: 143 pass 144 else: 145 self.fail("Able to write to readonly memory map") 146 147 # Ensuring that readonly mmap can't be write() to 148 try: 149 m.seek(0,0) 150 m.write(b'abc') 151 except TypeError: 152 pass 153 else: 154 self.fail("Able to write to readonly memory map") 155 156 # Ensuring that readonly mmap can't be write_byte() to 157 try: 158 m.seek(0,0) 159 m.write_byte(b'd') 160 except TypeError: 161 pass 162 else: 163 self.fail("Able to write to readonly memory map") 164 165 # Ensuring that readonly mmap can't be resized 166 try: 167 m.resize(2*mapsize) 168 except SystemError: # resize is not universally supported 169 pass 170 except TypeError: 171 pass 172 else: 173 self.fail("Able to resize readonly memory map") 174 with open(TESTFN, "rb") as fp: 175 self.assertEqual(fp.read(), b'a'*mapsize, 176 "Readonly memory map data file was modified") 177 178 # Opening mmap with size too big 179 with open(TESTFN, "r+b") as f: 180 try: 181 m = mmap.mmap(f.fileno(), mapsize+1) 182 except ValueError: 183 # we do not expect a ValueError on Windows 184 # CAUTION: This also changes the size of the file on disk, and 185 # later tests assume that the length hasn't changed. We need to 186 # repair that. 187 if sys.platform.startswith('win'): 188 self.fail("Opening mmap with size+1 should work on Windows.") 189 else: 190 # we expect a ValueError on Unix, but not on Windows 191 if not sys.platform.startswith('win'): 192 self.fail("Opening mmap with size+1 should raise ValueError.") 193 m.close() 194 if sys.platform.startswith('win'): 195 # Repair damage from the resizing test. 196 with open(TESTFN, 'r+b') as f: 197 f.truncate(mapsize) 198 199 # Opening mmap with access=ACCESS_WRITE 200 with open(TESTFN, "r+b") as f: 201 m = mmap.mmap(f.fileno(), mapsize, access=mmap.ACCESS_WRITE) 202 # Modifying write-through memory map 203 m[:] = b'c'*mapsize 204 self.assertEqual(m[:], b'c'*mapsize, 205 "Write-through memory map memory not updated properly.") 206 m.flush() 207 m.close() 208 with open(TESTFN, 'rb') as f: 209 stuff = f.read() 210 self.assertEqual(stuff, b'c'*mapsize, 211 "Write-through memory map data file not updated properly.") 212 213 # Opening mmap with access=ACCESS_COPY 214 with open(TESTFN, "r+b") as f: 215 m = mmap.mmap(f.fileno(), mapsize, access=mmap.ACCESS_COPY) 216 # Modifying copy-on-write memory map 217 m[:] = b'd'*mapsize 218 self.assertEqual(m[:], b'd' * mapsize, 219 "Copy-on-write memory map data not written correctly.") 220 m.flush() 221 with open(TESTFN, "rb") as fp: 222 self.assertEqual(fp.read(), b'c'*mapsize, 223 "Copy-on-write test data file should not be modified.") 224 # Ensuring copy-on-write maps cannot be resized 225 self.assertRaises(TypeError, m.resize, 2*mapsize) 226 m.close() 227 228 # Ensuring invalid access parameter raises exception 229 with open(TESTFN, "r+b") as f: 230 self.assertRaises(ValueError, mmap.mmap, f.fileno(), mapsize, access=4) 231 232 if os.name == "posix": 233 # Try incompatible flags, prot and access parameters. 234 with open(TESTFN, "r+b") as f: 235 self.assertRaises(ValueError, mmap.mmap, f.fileno(), mapsize, 236 flags=mmap.MAP_PRIVATE, 237 prot=mmap.PROT_READ, access=mmap.ACCESS_WRITE) 238 239 # Try writing with PROT_EXEC and without PROT_WRITE 240 prot = mmap.PROT_READ | getattr(mmap, 'PROT_EXEC', 0) 241 with open(TESTFN, "r+b") as f: 242 m = mmap.mmap(f.fileno(), mapsize, prot=prot) 243 self.assertRaises(TypeError, m.write, b"abcdef") 244 self.assertRaises(TypeError, m.write_byte, 0) 245 m.close() 246 247 def test_bad_file_desc(self): 248 # Try opening a bad file descriptor... 249 self.assertRaises(OSError, mmap.mmap, -2, 4096) 250 251 def test_tougher_find(self): 252 # Do a tougher .find() test. SF bug 515943 pointed out that, in 2.2, 253 # searching for data with embedded \0 bytes didn't work. 254 with open(TESTFN, 'wb+') as f: 255 256 data = b'aabaac\x00deef\x00\x00aa\x00' 257 n = len(data) 258 f.write(data) 259 f.flush() 260 m = mmap.mmap(f.fileno(), n) 261 262 for start in range(n+1): 263 for finish in range(start, n+1): 264 slice = data[start : finish] 265 self.assertEqual(m.find(slice), data.find(slice)) 266 self.assertEqual(m.find(slice + b'x'), -1) 267 m.close() 268 269 def test_find_end(self): 270 # test the new 'end' parameter works as expected 271 f = open(TESTFN, 'wb+') 272 data = b'one two ones' 273 n = len(data) 274 f.write(data) 275 f.flush() 276 m = mmap.mmap(f.fileno(), n) 277 f.close() 278 279 self.assertEqual(m.find(b'one'), 0) 280 self.assertEqual(m.find(b'ones'), 8) 281 self.assertEqual(m.find(b'one', 0, -1), 0) 282 self.assertEqual(m.find(b'one', 1), 8) 283 self.assertEqual(m.find(b'one', 1, -1), 8) 284 self.assertEqual(m.find(b'one', 1, -2), -1) 285 self.assertEqual(m.find(bytearray(b'one')), 0) 286 287 288 def test_rfind(self): 289 # test the new 'end' parameter works as expected 290 f = open(TESTFN, 'wb+') 291 data = b'one two ones' 292 n = len(data) 293 f.write(data) 294 f.flush() 295 m = mmap.mmap(f.fileno(), n) 296 f.close() 297 298 self.assertEqual(m.rfind(b'one'), 8) 299 self.assertEqual(m.rfind(b'one '), 0) 300 self.assertEqual(m.rfind(b'one', 0, -1), 8) 301 self.assertEqual(m.rfind(b'one', 0, -2), 0) 302 self.assertEqual(m.rfind(b'one', 1, -1), 8) 303 self.assertEqual(m.rfind(b'one', 1, -2), -1) 304 self.assertEqual(m.rfind(bytearray(b'one')), 8) 305 306 307 def test_double_close(self): 308 # make sure a double close doesn't crash on Solaris (Bug# 665913) 309 f = open(TESTFN, 'wb+') 310 311 f.write(2**16 * b'a') # Arbitrary character 312 f.close() 313 314 f = open(TESTFN, 'rb') 315 mf = mmap.mmap(f.fileno(), 2**16, access=mmap.ACCESS_READ) 316 mf.close() 317 mf.close() 318 f.close() 319 320 @unittest.skipUnless(hasattr(os, "stat"), "needs os.stat()") 321 def test_entire_file(self): 322 # test mapping of entire file by passing 0 for map length 323 f = open(TESTFN, "wb+") 324 325 f.write(2**16 * b'm') # Arbitrary character 326 f.close() 327 328 f = open(TESTFN, "rb+") 329 mf = mmap.mmap(f.fileno(), 0) 330 self.assertEqual(len(mf), 2**16, "Map size should equal file size.") 331 self.assertEqual(mf.read(2**16), 2**16 * b"m") 332 mf.close() 333 f.close() 334 335 @unittest.skipUnless(hasattr(os, "stat"), "needs os.stat()") 336 def test_length_0_offset(self): 337 # Issue #10916: test mapping of remainder of file by passing 0 for 338 # map length with an offset doesn't cause a segfault. 339 # NOTE: allocation granularity is currently 65536 under Win64, 340 # and therefore the minimum offset alignment. 341 with open(TESTFN, "wb") as f: 342 f.write((65536 * 2) * b'm') # Arbitrary character 343 344 with open(TESTFN, "rb") as f: 345 with mmap.mmap(f.fileno(), 0, offset=65536, access=mmap.ACCESS_READ) as mf: 346 self.assertRaises(IndexError, mf.__getitem__, 80000) 347 348 @unittest.skipUnless(hasattr(os, "stat"), "needs os.stat()") 349 def test_length_0_large_offset(self): 350 # Issue #10959: test mapping of a file by passing 0 for 351 # map length with a large offset doesn't cause a segfault. 352 with open(TESTFN, "wb") as f: 353 f.write(115699 * b'm') # Arbitrary character 354 355 with open(TESTFN, "w+b") as f: 356 self.assertRaises(ValueError, mmap.mmap, f.fileno(), 0, 357 offset=2147418112) 358 359 def test_move(self): 360 # make move works everywhere (64-bit format problem earlier) 361 f = open(TESTFN, 'wb+') 362 363 f.write(b"ABCDEabcde") # Arbitrary character 364 f.flush() 365 366 mf = mmap.mmap(f.fileno(), 10) 367 mf.move(5, 0, 5) 368 self.assertEqual(mf[:], b"ABCDEABCDE", "Map move should have duplicated front 5") 369 mf.close() 370 f.close() 371 372 # more excessive test 373 data = b"0123456789" 374 for dest in range(len(data)): 375 for src in range(len(data)): 376 for count in range(len(data) - max(dest, src)): 377 expected = data[:dest] + data[src:src+count] + data[dest+count:] 378 m = mmap.mmap(-1, len(data)) 379 m[:] = data 380 m.move(dest, src, count) 381 self.assertEqual(m[:], expected) 382 m.close() 383 384 # segfault test (Issue 5387) 385 m = mmap.mmap(-1, 100) 386 offsets = [-100, -1, 0, 1, 100] 387 for source, dest, size in itertools.product(offsets, offsets, offsets): 388 try: 389 m.move(source, dest, size) 390 except ValueError: 391 pass 392 393 offsets = [(-1, -1, -1), (-1, -1, 0), (-1, 0, -1), (0, -1, -1), 394 (-1, 0, 0), (0, -1, 0), (0, 0, -1)] 395 for source, dest, size in offsets: 396 self.assertRaises(ValueError, m.move, source, dest, size) 397 398 m.close() 399 400 m = mmap.mmap(-1, 1) # single byte 401 self.assertRaises(ValueError, m.move, 0, 0, 2) 402 self.assertRaises(ValueError, m.move, 1, 0, 1) 403 self.assertRaises(ValueError, m.move, 0, 1, 1) 404 m.move(0, 0, 1) 405 m.move(0, 0, 0) 406 407 408 def test_anonymous(self): 409 # anonymous mmap.mmap(-1, PAGE) 410 m = mmap.mmap(-1, PAGESIZE) 411 for x in range(PAGESIZE): 412 self.assertEqual(m[x], 0, 413 "anonymously mmap'ed contents should be zero") 414 415 for x in range(PAGESIZE): 416 b = x & 0xff 417 m[x] = b 418 self.assertEqual(m[x], b) 419 420 def test_read_all(self): 421 m = mmap.mmap(-1, 16) 422 self.addCleanup(m.close) 423 424 # With no parameters, or None or a negative argument, reads all 425 m.write(bytes(range(16))) 426 m.seek(0) 427 self.assertEqual(m.read(), bytes(range(16))) 428 m.seek(8) 429 self.assertEqual(m.read(), bytes(range(8, 16))) 430 m.seek(16) 431 self.assertEqual(m.read(), b'') 432 m.seek(3) 433 self.assertEqual(m.read(None), bytes(range(3, 16))) 434 m.seek(4) 435 self.assertEqual(m.read(-1), bytes(range(4, 16))) 436 m.seek(5) 437 self.assertEqual(m.read(-2), bytes(range(5, 16))) 438 m.seek(9) 439 self.assertEqual(m.read(-42), bytes(range(9, 16))) 440 441 def test_read_invalid_arg(self): 442 m = mmap.mmap(-1, 16) 443 self.addCleanup(m.close) 444 445 self.assertRaises(TypeError, m.read, 'foo') 446 self.assertRaises(TypeError, m.read, 5.5) 447 self.assertRaises(TypeError, m.read, [1, 2, 3]) 448 449 def test_extended_getslice(self): 450 # Test extended slicing by comparing with list slicing. 451 s = bytes(reversed(range(256))) 452 m = mmap.mmap(-1, len(s)) 453 m[:] = s 454 self.assertEqual(m[:], s) 455 indices = (0, None, 1, 3, 19, 300, -1, -2, -31, -300) 456 for start in indices: 457 for stop in indices: 458 # Skip step 0 (invalid) 459 for step in indices[1:]: 460 self.assertEqual(m[start:stop:step], 461 s[start:stop:step]) 462 463 def test_extended_set_del_slice(self): 464 # Test extended slicing by comparing with list slicing. 465 s = bytes(reversed(range(256))) 466 m = mmap.mmap(-1, len(s)) 467 indices = (0, None, 1, 3, 19, 300, -1, -2, -31, -300) 468 for start in indices: 469 for stop in indices: 470 # Skip invalid step 0 471 for step in indices[1:]: 472 m[:] = s 473 self.assertEqual(m[:], s) 474 L = list(s) 475 # Make sure we have a slice of exactly the right length, 476 # but with different data. 477 data = L[start:stop:step] 478 data = bytes(reversed(data)) 479 L[start:stop:step] = data 480 m[start:stop:step] = data 481 self.assertEqual(m[:], bytes(L)) 482 483 def make_mmap_file (self, f, halfsize): 484 # Write 2 pages worth of data to the file 485 f.write (b'\0' * halfsize) 486 f.write (b'foo') 487 f.write (b'\0' * (halfsize - 3)) 488 f.flush () 489 return mmap.mmap (f.fileno(), 0) 490 491 def test_empty_file (self): 492 f = open (TESTFN, 'w+b') 493 f.close() 494 with open(TESTFN, "rb") as f : 495 self.assertRaisesRegex(ValueError, 496 "cannot mmap an empty file", 497 mmap.mmap, f.fileno(), 0, 498 access=mmap.ACCESS_READ) 499 500 def test_offset (self): 501 f = open (TESTFN, 'w+b') 502 503 try: # unlink TESTFN no matter what 504 halfsize = mmap.ALLOCATIONGRANULARITY 505 m = self.make_mmap_file (f, halfsize) 506 m.close () 507 f.close () 508 509 mapsize = halfsize * 2 510 # Try invalid offset 511 f = open(TESTFN, "r+b") 512 for offset in [-2, -1, None]: 513 try: 514 m = mmap.mmap(f.fileno(), mapsize, offset=offset) 515 self.assertEqual(0, 1) 516 except (ValueError, TypeError, OverflowError): 517 pass 518 else: 519 self.assertEqual(0, 0) 520 f.close() 521 522 # Try valid offset, hopefully 8192 works on all OSes 523 f = open(TESTFN, "r+b") 524 m = mmap.mmap(f.fileno(), mapsize - halfsize, offset=halfsize) 525 self.assertEqual(m[0:3], b'foo') 526 f.close() 527 528 # Try resizing map 529 try: 530 m.resize(512) 531 except SystemError: 532 pass 533 else: 534 # resize() is supported 535 self.assertEqual(len(m), 512) 536 # Check that we can no longer seek beyond the new size. 537 self.assertRaises(ValueError, m.seek, 513, 0) 538 # Check that the content is not changed 539 self.assertEqual(m[0:3], b'foo') 540 541 # Check that the underlying file is truncated too 542 f = open(TESTFN, 'rb') 543 f.seek(0, 2) 544 self.assertEqual(f.tell(), halfsize + 512) 545 f.close() 546 self.assertEqual(m.size(), halfsize + 512) 547 548 m.close() 549 550 finally: 551 f.close() 552 try: 553 os.unlink(TESTFN) 554 except OSError: 555 pass 556 557 def test_subclass(self): 558 class anon_mmap(mmap.mmap): 559 def __new__(klass, *args, **kwargs): 560 return mmap.mmap.__new__(klass, -1, *args, **kwargs) 561 anon_mmap(PAGESIZE) 562 563 @unittest.skipUnless(hasattr(mmap, 'PROT_READ'), "needs mmap.PROT_READ") 564 def test_prot_readonly(self): 565 mapsize = 10 566 with open(TESTFN, "wb") as fp: 567 fp.write(b"a"*mapsize) 568 f = open(TESTFN, "rb") 569 m = mmap.mmap(f.fileno(), mapsize, prot=mmap.PROT_READ) 570 self.assertRaises(TypeError, m.write, "foo") 571 f.close() 572 573 def test_error(self): 574 self.assertIs(mmap.error, OSError) 575 576 def test_io_methods(self): 577 data = b"0123456789" 578 with open(TESTFN, "wb") as fp: 579 fp.write(b"x"*len(data)) 580 f = open(TESTFN, "r+b") 581 m = mmap.mmap(f.fileno(), len(data)) 582 f.close() 583 # Test write_byte() 584 for i in range(len(data)): 585 self.assertEqual(m.tell(), i) 586 m.write_byte(data[i]) 587 self.assertEqual(m.tell(), i+1) 588 self.assertRaises(ValueError, m.write_byte, b"x"[0]) 589 self.assertEqual(m[:], data) 590 # Test read_byte() 591 m.seek(0) 592 for i in range(len(data)): 593 self.assertEqual(m.tell(), i) 594 self.assertEqual(m.read_byte(), data[i]) 595 self.assertEqual(m.tell(), i+1) 596 self.assertRaises(ValueError, m.read_byte) 597 # Test read() 598 m.seek(3) 599 self.assertEqual(m.read(3), b"345") 600 self.assertEqual(m.tell(), 6) 601 # Test write() 602 m.seek(3) 603 m.write(b"bar") 604 self.assertEqual(m.tell(), 6) 605 self.assertEqual(m[:], b"012bar6789") 606 m.write(bytearray(b"baz")) 607 self.assertEqual(m.tell(), 9) 608 self.assertEqual(m[:], b"012barbaz9") 609 self.assertRaises(ValueError, m.write, b"ba") 610 611 def test_non_ascii_byte(self): 612 for b in (129, 200, 255): # > 128 613 m = mmap.mmap(-1, 1) 614 m.write_byte(b) 615 self.assertEqual(m[0], b) 616 m.seek(0) 617 self.assertEqual(m.read_byte(), b) 618 m.close() 619 620 @unittest.skipUnless(os.name == 'nt', 'requires Windows') 621 def test_tagname(self): 622 data1 = b"0123456789" 623 data2 = b"abcdefghij" 624 assert len(data1) == len(data2) 625 626 # Test same tag 627 m1 = mmap.mmap(-1, len(data1), tagname="foo") 628 m1[:] = data1 629 m2 = mmap.mmap(-1, len(data2), tagname="foo") 630 m2[:] = data2 631 self.assertEqual(m1[:], data2) 632 self.assertEqual(m2[:], data2) 633 m2.close() 634 m1.close() 635 636 # Test different tag 637 m1 = mmap.mmap(-1, len(data1), tagname="foo") 638 m1[:] = data1 639 m2 = mmap.mmap(-1, len(data2), tagname="boo") 640 m2[:] = data2 641 self.assertEqual(m1[:], data1) 642 self.assertEqual(m2[:], data2) 643 m2.close() 644 m1.close() 645 646 @cpython_only 647 @unittest.skipUnless(os.name == 'nt', 'requires Windows') 648 def test_sizeof(self): 649 m1 = mmap.mmap(-1, 100) 650 tagname = "foo" 651 m2 = mmap.mmap(-1, 100, tagname=tagname) 652 self.assertEqual(sys.getsizeof(m2), 653 sys.getsizeof(m1) + len(tagname) + 1) 654 655 @unittest.skipUnless(os.name == 'nt', 'requires Windows') 656 def test_crasher_on_windows(self): 657 # Should not crash (Issue 1733986) 658 m = mmap.mmap(-1, 1000, tagname="foo") 659 try: 660 mmap.mmap(-1, 5000, tagname="foo")[:] # same tagname, but larger size 661 except: 662 pass 663 m.close() 664 665 # Should not crash (Issue 5385) 666 with open(TESTFN, "wb") as fp: 667 fp.write(b"x"*10) 668 f = open(TESTFN, "r+b") 669 m = mmap.mmap(f.fileno(), 0) 670 f.close() 671 try: 672 m.resize(0) # will raise OSError 673 except: 674 pass 675 try: 676 m[:] 677 except: 678 pass 679 m.close() 680 681 @unittest.skipUnless(os.name == 'nt', 'requires Windows') 682 def test_invalid_descriptor(self): 683 # socket file descriptors are valid, but out of range 684 # for _get_osfhandle, causing a crash when validating the 685 # parameters to _get_osfhandle. 686 s = socket.socket() 687 try: 688 with self.assertRaises(OSError): 689 m = mmap.mmap(s.fileno(), 10) 690 finally: 691 s.close() 692 693 def test_context_manager(self): 694 with mmap.mmap(-1, 10) as m: 695 self.assertFalse(m.closed) 696 self.assertTrue(m.closed) 697 698 def test_context_manager_exception(self): 699 # Test that the OSError gets passed through 700 with self.assertRaises(Exception) as exc: 701 with mmap.mmap(-1, 10) as m: 702 raise OSError 703 self.assertIsInstance(exc.exception, OSError, 704 "wrong exception raised in context manager") 705 self.assertTrue(m.closed, "context manager failed") 706 707 def test_weakref(self): 708 # Check mmap objects are weakrefable 709 mm = mmap.mmap(-1, 16) 710 wr = weakref.ref(mm) 711 self.assertIs(wr(), mm) 712 del mm 713 gc_collect() 714 self.assertIs(wr(), None) 715 716 def test_write_returning_the_number_of_bytes_written(self): 717 mm = mmap.mmap(-1, 16) 718 self.assertEqual(mm.write(b""), 0) 719 self.assertEqual(mm.write(b"x"), 1) 720 self.assertEqual(mm.write(b"yz"), 2) 721 self.assertEqual(mm.write(b"python"), 6) 722 723 @unittest.skipIf(os.name == 'nt', 'cannot resize anonymous mmaps on Windows') 724 def test_resize_past_pos(self): 725 m = mmap.mmap(-1, 8192) 726 self.addCleanup(m.close) 727 m.read(5000) 728 try: 729 m.resize(4096) 730 except SystemError: 731 self.skipTest("resizing not supported") 732 self.assertEqual(m.read(14), b'') 733 self.assertRaises(ValueError, m.read_byte) 734 self.assertRaises(ValueError, m.write_byte, 42) 735 self.assertRaises(ValueError, m.write, b'abc') 736 737 def test_concat_repeat_exception(self): 738 m = mmap.mmap(-1, 16) 739 with self.assertRaises(TypeError): 740 m + m 741 with self.assertRaises(TypeError): 742 m * 2 743 744 745 class LargeMmapTests(unittest.TestCase): 746 747 def setUp(self): 748 unlink(TESTFN) 749 750 def tearDown(self): 751 unlink(TESTFN) 752 753 def _make_test_file(self, num_zeroes, tail): 754 if sys.platform[:3] == 'win' or sys.platform == 'darwin': 755 requires('largefile', 756 'test requires %s bytes and a long time to run' % str(0x180000000)) 757 f = open(TESTFN, 'w+b') 758 try: 759 f.seek(num_zeroes) 760 f.write(tail) 761 f.flush() 762 except (OSError, OverflowError, ValueError): 763 try: 764 f.close() 765 except (OSError, OverflowError): 766 pass 767 raise unittest.SkipTest("filesystem does not have largefile support") 768 return f 769 770 def test_large_offset(self): 771 with self._make_test_file(0x14FFFFFFF, b" ") as f: 772 with mmap.mmap(f.fileno(), 0, offset=0x140000000, access=mmap.ACCESS_READ) as m: 773 self.assertEqual(m[0xFFFFFFF], 32) 774 775 def test_large_filesize(self): 776 with self._make_test_file(0x17FFFFFFF, b" ") as f: 777 if sys.maxsize < 0x180000000: 778 # On 32 bit platforms the file is larger than sys.maxsize so 779 # mapping the whole file should fail -- Issue #16743 780 with self.assertRaises(OverflowError): 781 mmap.mmap(f.fileno(), 0x180000000, access=mmap.ACCESS_READ) 782 with self.assertRaises(ValueError): 783 mmap.mmap(f.fileno(), 0, access=mmap.ACCESS_READ) 784 with mmap.mmap(f.fileno(), 0x10000, access=mmap.ACCESS_READ) as m: 785 self.assertEqual(m.size(), 0x180000000) 786 787 # Issue 11277: mmap() with large (~4 GiB) sparse files crashes on OS X. 788 789 def _test_around_boundary(self, boundary): 790 tail = b' DEARdear ' 791 start = boundary - len(tail) // 2 792 end = start + len(tail) 793 with self._make_test_file(start, tail) as f: 794 with mmap.mmap(f.fileno(), 0, access=mmap.ACCESS_READ) as m: 795 self.assertEqual(m[start:end], tail) 796 797 @unittest.skipUnless(sys.maxsize > _4G, "test cannot run on 32-bit systems") 798 def test_around_2GB(self): 799 self._test_around_boundary(_2G) 800 801 @unittest.skipUnless(sys.maxsize > _4G, "test cannot run on 32-bit systems") 802 def test_around_4GB(self): 803 self._test_around_boundary(_4G) 804 805 806 def test_main(): 807 run_unittest(MmapTests, LargeMmapTests) 808 809 if __name__ == '__main__': 810 test_main() 811