1 #!/usr/bin/python2.4 2 # 3 # Copyright (C) 2008 Google Inc. 4 # 5 # Licensed under the Apache License, Version 2.0 (the "License"); 6 # you may not use this file except in compliance with the License. 7 # You may obtain a copy of the License at 8 # 9 # http://www.apache.org/licenses/LICENSE-2.0 10 # 11 # Unless required by applicable law or agreed to in writing, software 12 # distributed under the License is distributed on an "AS IS" BASIS, 13 # WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. 14 # See the License for the specific language governing permissions and 15 # limitations under the License. 16 # 17 18 """Tests for divide_and_compress.py. 19 20 TODO(jmatt): Add tests for module methods. 21 """ 22 23 __author__ = 'jmatt (at] google.com (Justin Mattson)' 24 25 import os 26 import stat 27 import unittest 28 import zipfile 29 30 import divide_and_compress 31 import mox 32 33 34 class BagOfParts(object): 35 """Just a generic class that I can use to assign random attributes to.""" 36 37 def NoOp(self): 38 x = 1 39 40 41 class ValidAndRemoveTests(unittest.TestCase): 42 """Test the ArchiveIsValid and RemoveLastFile methods.""" 43 44 def setUp(self): 45 """Prepare the test. 46 47 Construct some mock objects for use with the tests. 48 """ 49 self.my_mox = mox.Mox() 50 file1 = BagOfParts() 51 file1.filename = 'file1.txt' 52 file1.contents = 'This is a test file' 53 file2 = BagOfParts() 54 file2.filename = 'file2.txt' 55 file2.contents = ('akdjfk;djsf;kljdslkfjslkdfjlsfjkdvn;kn;2389rtu4i' 56 'tn;ghf8:89H*hp748FJw80fu9WJFpwf39pujens;fihkhjfk' 57 'sdjfljkgsc n;iself') 58 self.files = {'file1': file1, 'file2': file2} 59 60 def tearDown(self): 61 """Remove any stubs we've created.""" 62 self.my_mox.UnsetStubs() 63 64 def testArchiveIsValid(self): 65 """Test the DirectoryZipper.ArchiveIsValid method. 66 67 Run two tests, one that we expect to pass and one that we expect to fail 68 """ 69 test_file_size = 1056730 70 self.my_mox.StubOutWithMock(os, 'stat') 71 os.stat('/foo/0.zip').AndReturn([test_file_size]) 72 self.my_mox.StubOutWithMock(stat, 'ST_SIZE') 73 stat.ST_SIZE = 0 74 os.stat('/baz/0.zip').AndReturn([test_file_size]) 75 mox.Replay(os.stat) 76 test_target = divide_and_compress.DirectoryZipper('/foo/', 'bar', 77 test_file_size - 1, True) 78 79 self.assertEqual(False, test_target.ArchiveIsValid(), 80 msg=('ERROR: Test failed, ArchiveIsValid should have ' 81 'returned false, but returned true')) 82 83 test_target = divide_and_compress.DirectoryZipper('/baz/', 'bar', 84 test_file_size + 1, True) 85 self.assertEqual(True, test_target.ArchiveIsValid(), 86 msg=('ERROR: Test failed, ArchiveIsValid should have' 87 ' returned true, but returned false')) 88 89 def testRemoveLastFile(self): 90 """Test DirectoryZipper.RemoveLastFile method. 91 92 Construct a ZipInfo mock object with two records, verify that write is 93 only called once on the new ZipFile object. 94 """ 95 source = self.CreateZipSource() 96 dest = self.CreateZipDestination() 97 source_path = ''.join([os.getcwd(), '/0-old.zip']) 98 dest_path = ''.join([os.getcwd(), '/0.zip']) 99 test_target = divide_and_compress.DirectoryZipper( 100 ''.join([os.getcwd(), '/']), 'dummy', 1024*1024, True) 101 self.my_mox.StubOutWithMock(test_target, 'OpenZipFileAtPath') 102 test_target.OpenZipFileAtPath(source_path, mode='r').AndReturn(source) 103 test_target.OpenZipFileAtPath(dest_path, 104 compress=zipfile.ZIP_DEFLATED, 105 mode='w').AndReturn(dest) 106 self.my_mox.StubOutWithMock(os, 'rename') 107 os.rename(dest_path, source_path) 108 self.my_mox.StubOutWithMock(os, 'unlink') 109 os.unlink(source_path) 110 111 self.my_mox.ReplayAll() 112 test_target.RemoveLastFile() 113 self.my_mox.VerifyAll() 114 115 def CreateZipSource(self): 116 """Create a mock zip sourec object. 117 118 Read should only be called once, because the second file is the one 119 being removed. 120 121 Returns: 122 A configured mocked 123 """ 124 125 source_zip = self.my_mox.CreateMock(zipfile.ZipFile) 126 source_zip.infolist().AndReturn([self.files['file1'], self.files['file1']]) 127 source_zip.infolist().AndReturn([self.files['file1'], self.files['file1']]) 128 source_zip.read(self.files['file1'].filename).AndReturn( 129 self.files['file1'].contents) 130 source_zip.close() 131 return source_zip 132 133 def CreateZipDestination(self): 134 """Create mock destination zip. 135 136 Write should only be called once, because there are two files in the 137 source zip and we expect the second to be removed. 138 139 Returns: 140 A configured mocked 141 """ 142 143 dest_zip = mox.MockObject(zipfile.ZipFile) 144 dest_zip.writestr(self.files['file1'].filename, 145 self.files['file1'].contents) 146 dest_zip.close() 147 return dest_zip 148 149 150 class FixArchiveTests(unittest.TestCase): 151 """Tests for the DirectoryZipper.FixArchive method.""" 152 153 def setUp(self): 154 """Create a mock file object.""" 155 self.my_mox = mox.Mox() 156 self.file1 = BagOfParts() 157 self.file1.filename = 'file1.txt' 158 self.file1.contents = 'This is a test file' 159 160 def tearDown(self): 161 """Unset any mocks that we've created.""" 162 self.my_mox.UnsetStubs() 163 164 def _InitMultiFileData(self): 165 """Create an array of mock file objects. 166 167 Create three mock file objects that we can use for testing. 168 """ 169 self.multi_file_dir = [] 170 171 file1 = BagOfParts() 172 file1.filename = 'file1.txt' 173 file1.contents = 'kjaskl;jkdjfkja;kjsnbvjnvnbuewklriujalvjsd' 174 self.multi_file_dir.append(file1) 175 176 file2 = BagOfParts() 177 file2.filename = 'file2.txt' 178 file2.contents = ('He entered the room and there in the center, it was.' 179 ' Looking upon the thing, suddenly he could not remember' 180 ' whether he had actually seen it before or whether' 181 ' his memory of it was merely the effect of something' 182 ' so often being imagined that it had long since become ' 183 ' manifest in his mind.') 184 self.multi_file_dir.append(file2) 185 186 file3 = BagOfParts() 187 file3.filename = 'file3.txt' 188 file3.contents = 'Whoa, what is \'file2.txt\' all about?' 189 self.multi_file_dir.append(file3) 190 191 def testSingleFileArchive(self): 192 """Test behavior of FixArchive when the archive has a single member. 193 194 We expect that when this method is called with an archive that has a 195 single member that it will return False and unlink the archive. 196 """ 197 test_target = divide_and_compress.DirectoryZipper( 198 ''.join([os.getcwd(), '/']), 'dummy', 1024*1024, True) 199 self.my_mox.StubOutWithMock(test_target, 'OpenZipFileAtPath') 200 test_target.OpenZipFileAtPath( 201 ''.join([os.getcwd(), '/0.zip']), mode='r').AndReturn( 202 self.CreateSingleFileMock()) 203 self.my_mox.StubOutWithMock(os, 'unlink') 204 os.unlink(''.join([os.getcwd(), '/0.zip'])) 205 self.my_mox.ReplayAll() 206 self.assertEqual(False, test_target.FixArchive('SIZE')) 207 self.my_mox.VerifyAll() 208 209 def CreateSingleFileMock(self): 210 """Create a mock ZipFile object for testSingleFileArchive. 211 212 We just need it to return a single member infolist twice 213 214 Returns: 215 A configured mock object 216 """ 217 mock_zip = self.my_mox.CreateMock(zipfile.ZipFile) 218 mock_zip.infolist().AndReturn([self.file1]) 219 mock_zip.infolist().AndReturn([self.file1]) 220 mock_zip.close() 221 return mock_zip 222 223 def testMultiFileArchive(self): 224 """Test behavior of DirectoryZipper.FixArchive with a multi-file archive. 225 226 We expect that FixArchive will rename the old archive, adding '-old' before 227 '.zip', read all the members except the last one of '-old' into a new 228 archive with the same name as the original, and then unlink the '-old' copy 229 """ 230 test_target = divide_and_compress.DirectoryZipper( 231 ''.join([os.getcwd(), '/']), 'dummy', 1024*1024, True) 232 self.my_mox.StubOutWithMock(test_target, 'OpenZipFileAtPath') 233 test_target.OpenZipFileAtPath( 234 ''.join([os.getcwd(), '/0.zip']), mode='r').AndReturn( 235 self.CreateMultiFileMock()) 236 self.my_mox.StubOutWithMock(test_target, 'RemoveLastFile') 237 test_target.RemoveLastFile(''.join([os.getcwd(), '/0.zip'])) 238 self.my_mox.StubOutWithMock(os, 'stat') 239 os.stat(''.join([os.getcwd(), '/0.zip'])).AndReturn([49302]) 240 self.my_mox.StubOutWithMock(stat, 'ST_SIZE') 241 stat.ST_SIZE = 0 242 self.my_mox.ReplayAll() 243 self.assertEqual(True, test_target.FixArchive('SIZE')) 244 self.my_mox.VerifyAll() 245 246 def CreateMultiFileMock(self): 247 """Create mock ZipFile object for use with testMultiFileArchive. 248 249 The mock just needs to return the infolist mock that is prepared in 250 InitMultiFileData() 251 252 Returns: 253 A configured mock object 254 """ 255 self._InitMultiFileData() 256 mock_zip = self.my_mox.CreateMock(zipfile.ZipFile) 257 mock_zip.infolist().AndReturn(self.multi_file_dir) 258 mock_zip.close() 259 return mock_zip 260 261 262 class AddFileToArchiveTest(unittest.TestCase): 263 """Test behavior of method to add a file to an archive.""" 264 265 def setUp(self): 266 """Setup the arguments for the DirectoryZipper object.""" 267 self.my_mox = mox.Mox() 268 self.output_dir = '%s/' % os.getcwd() 269 self.file_to_add = 'file.txt' 270 self.input_dir = '/foo/bar/baz/' 271 272 def tearDown(self): 273 self.my_mox.UnsetStubs() 274 275 def testAddFileToArchive(self): 276 """Test the DirectoryZipper.AddFileToArchive method. 277 278 We are testing a pretty trivial method, we just expect it to look at the 279 file its adding, so that it possible can through out a warning. 280 """ 281 test_target = divide_and_compress.DirectoryZipper(self.output_dir, 282 self.input_dir, 283 1024*1024, True) 284 self.my_mox.StubOutWithMock(test_target, 'OpenZipFileAtPath') 285 archive_mock = self.CreateArchiveMock() 286 test_target.OpenZipFileAtPath( 287 ''.join([self.output_dir, '0.zip']), 288 compress=zipfile.ZIP_DEFLATED).AndReturn(archive_mock) 289 self.StubOutOsModule() 290 self.my_mox.ReplayAll() 291 test_target.AddFileToArchive(''.join([self.input_dir, self.file_to_add]), 292 zipfile.ZIP_DEFLATED) 293 self.my_mox.VerifyAll() 294 295 def StubOutOsModule(self): 296 """Create a mock for the os.path and os.stat objects. 297 298 Create a stub that will return the type (file or directory) and size of the 299 object that is to be added. 300 """ 301 self.my_mox.StubOutWithMock(os.path, 'isfile') 302 os.path.isfile(''.join([self.input_dir, self.file_to_add])).AndReturn(True) 303 self.my_mox.StubOutWithMock(os, 'stat') 304 os.stat(''.join([self.input_dir, self.file_to_add])).AndReturn([39480]) 305 self.my_mox.StubOutWithMock(stat, 'ST_SIZE') 306 stat.ST_SIZE = 0 307 308 def CreateArchiveMock(self): 309 """Create a mock ZipFile for use with testAddFileToArchive. 310 311 Just verify that write is called with the file we expect and that the 312 archive is closed after the file addition 313 314 Returns: 315 A configured mock object 316 """ 317 archive_mock = self.my_mox.CreateMock(zipfile.ZipFile) 318 archive_mock.write(''.join([self.input_dir, self.file_to_add]), 319 self.file_to_add) 320 archive_mock.close() 321 return archive_mock 322 323 324 class CompressDirectoryTest(unittest.TestCase): 325 """Test the master method of the class. 326 327 Testing with the following directory structure. 328 /dir1/ 329 /dir1/file1.txt 330 /dir1/file2.txt 331 /dir1/dir2/ 332 /dir1/dir2/dir3/ 333 /dir1/dir2/dir4/ 334 /dir1/dir2/dir4/file3.txt 335 /dir1/dir5/ 336 /dir1/dir5/file4.txt 337 /dir1/dir5/file5.txt 338 /dir1/dir5/file6.txt 339 /dir1/dir5/file7.txt 340 /dir1/dir6/ 341 /dir1/dir6/file8.txt 342 343 file1.txt., file2.txt, file3.txt should be in 0.zip 344 file4.txt should be in 1.zip 345 file5.txt, file6.txt should be in 2.zip 346 file7.txt will not be stored since it will be too large compressed 347 file8.txt should b in 3.zip 348 """ 349 350 def setUp(self): 351 """Setup all the mocks for this test.""" 352 self.my_mox = mox.Mox() 353 354 self.base_dir = '/dir1' 355 self.output_path = '/out_dir/' 356 self.test_target = divide_and_compress.DirectoryZipper( 357 self.output_path, self.base_dir, 1024*1024, True) 358 359 self.InitArgLists() 360 self.InitOsDotPath() 361 self.InitArchiveIsValid() 362 self.InitWriteIndexRecord() 363 self.InitAddFileToArchive() 364 365 def tearDown(self): 366 self.my_mox.UnsetStubs() 367 368 def testCompressDirectory(self): 369 """Test the DirectoryZipper.CompressDirectory method.""" 370 self.my_mox.ReplayAll() 371 for arguments in self.argument_lists: 372 self.test_target.CompressDirectory(None, arguments[0], arguments[1]) 373 self.my_mox.VerifyAll() 374 375 def InitAddFileToArchive(self): 376 """Setup mock for DirectoryZipper.AddFileToArchive. 377 378 Make sure that the files are added in the order we expect. 379 """ 380 self.my_mox.StubOutWithMock(self.test_target, 'AddFileToArchive') 381 self.test_target.AddFileToArchive('/dir1/file1.txt', zipfile.ZIP_DEFLATED) 382 self.test_target.AddFileToArchive('/dir1/file2.txt', zipfile.ZIP_DEFLATED) 383 self.test_target.AddFileToArchive('/dir1/dir2/dir4/file3.txt', 384 zipfile.ZIP_DEFLATED) 385 self.test_target.AddFileToArchive('/dir1/dir5/file4.txt', 386 zipfile.ZIP_DEFLATED) 387 self.test_target.AddFileToArchive('/dir1/dir5/file4.txt', 388 zipfile.ZIP_DEFLATED) 389 self.test_target.AddFileToArchive('/dir1/dir5/file5.txt', 390 zipfile.ZIP_DEFLATED) 391 self.test_target.AddFileToArchive('/dir1/dir5/file5.txt', 392 zipfile.ZIP_DEFLATED) 393 self.test_target.AddFileToArchive('/dir1/dir5/file6.txt', 394 zipfile.ZIP_DEFLATED) 395 self.test_target.AddFileToArchive('/dir1/dir5/file7.txt', 396 zipfile.ZIP_DEFLATED) 397 self.test_target.AddFileToArchive('/dir1/dir5/file7.txt', 398 zipfile.ZIP_DEFLATED) 399 self.test_target.AddFileToArchive('/dir1/dir6/file8.txt', 400 zipfile.ZIP_DEFLATED) 401 402 def InitWriteIndexRecord(self): 403 """Setup mock for DirectoryZipper.WriteIndexRecord.""" 404 self.my_mox.StubOutWithMock(self.test_target, 'WriteIndexRecord') 405 406 # we are trying to compress 8 files, but we should only attempt to 407 # write an index record 7 times, because one file is too large to be stored 408 self.test_target.WriteIndexRecord().AndReturn(True) 409 self.test_target.WriteIndexRecord().AndReturn(False) 410 self.test_target.WriteIndexRecord().AndReturn(False) 411 self.test_target.WriteIndexRecord().AndReturn(True) 412 self.test_target.WriteIndexRecord().AndReturn(True) 413 self.test_target.WriteIndexRecord().AndReturn(False) 414 self.test_target.WriteIndexRecord().AndReturn(True) 415 416 def InitArchiveIsValid(self): 417 """Mock out DirectoryZipper.ArchiveIsValid and DirectoryZipper.FixArchive. 418 419 Mock these methods out such that file1, file2, and file3 go into one 420 archive. file4 then goes into the next archive, file5 and file6 in the 421 next, file 7 should appear too large to compress into an archive, and 422 file8 goes into the final archive 423 """ 424 self.my_mox.StubOutWithMock(self.test_target, 'ArchiveIsValid') 425 self.my_mox.StubOutWithMock(self.test_target, 'FixArchive') 426 self.test_target.ArchiveIsValid().AndReturn(True) 427 self.test_target.ArchiveIsValid().AndReturn(True) 428 self.test_target.ArchiveIsValid().AndReturn(True) 429 430 # should be file4.txt 431 self.test_target.ArchiveIsValid().AndReturn(False) 432 self.test_target.FixArchive('SIZE').AndReturn(True) 433 self.test_target.ArchiveIsValid().AndReturn(True) 434 435 # should be file5.txt 436 self.test_target.ArchiveIsValid().AndReturn(False) 437 self.test_target.FixArchive('SIZE').AndReturn(True) 438 self.test_target.ArchiveIsValid().AndReturn(True) 439 self.test_target.ArchiveIsValid().AndReturn(True) 440 441 # should be file7.txt 442 self.test_target.ArchiveIsValid().AndReturn(False) 443 self.test_target.FixArchive('SIZE').AndReturn(True) 444 self.test_target.ArchiveIsValid().AndReturn(False) 445 self.test_target.FixArchive('SIZE').AndReturn(False) 446 self.test_target.ArchiveIsValid().AndReturn(True) 447 448 def InitOsDotPath(self): 449 """Mock out os.path.isfile. 450 451 Mock this out so the things we want to appear as files appear as files and 452 the things we want to appear as directories appear as directories. Also 453 make sure that the order of file visits is as we expect (which is why 454 InAnyOrder isn't used here). 455 """ 456 self.my_mox.StubOutWithMock(os.path, 'isfile') 457 os.path.isfile('/dir1/dir2').AndReturn(False) 458 os.path.isfile('/dir1/dir5').AndReturn(False) 459 os.path.isfile('/dir1/dir6').AndReturn(False) 460 os.path.isfile('/dir1/file1.txt').AndReturn(True) 461 os.path.isfile('/dir1/file2.txt').AndReturn(True) 462 os.path.isfile('/dir1/dir2/dir3').AndReturn(False) 463 os.path.isfile('/dir1/dir2/dir4').AndReturn(False) 464 os.path.isfile('/dir1/dir2/dir4/file3.txt').AndReturn(True) 465 os.path.isfile('/dir1/dir5/file4.txt').AndReturn(True) 466 os.path.isfile('/dir1/dir5/file4.txt').AndReturn(True) 467 os.path.isfile('/dir1/dir5/file5.txt').AndReturn(True) 468 os.path.isfile('/dir1/dir5/file5.txt').AndReturn(True) 469 os.path.isfile('/dir1/dir5/file6.txt').AndReturn(True) 470 os.path.isfile('/dir1/dir5/file7.txt').AndReturn(True) 471 os.path.isfile('/dir1/dir5/file7.txt').AndReturn(True) 472 os.path.isfile('/dir1/dir6/file8.txt').AndReturn(True) 473 474 def InitArgLists(self): 475 """Create the directory path => directory contents mappings.""" 476 self.argument_lists = [] 477 self.argument_lists.append(['/dir1', 478 ['file1.txt', 'file2.txt', 'dir2', 'dir5', 479 'dir6']]) 480 self.argument_lists.append(['/dir1/dir2', ['dir3', 'dir4']]) 481 self.argument_lists.append(['/dir1/dir2/dir3', []]) 482 self.argument_lists.append(['/dir1/dir2/dir4', ['file3.txt']]) 483 self.argument_lists.append(['/dir1/dir5', 484 ['file4.txt', 'file5.txt', 'file6.txt', 485 'file7.txt']]) 486 self.argument_lists.append(['/dir1/dir6', ['file8.txt']]) 487 488 if __name__ == '__main__': 489 unittest.main() 490