1 # Copyright 2012 Google Inc. All Rights Reserved. 2 # Author: mrdmnd@ (Matt Redmond) 3 """A unit test for sending data to Bartlett. Requires poster module.""" 4 5 import cookielib 6 import os 7 import signal 8 import subprocess 9 import tempfile 10 import time 11 import unittest 12 import urllib2 13 14 from poster.encode import multipart_encode 15 from poster.streaminghttp import register_openers 16 17 SERVER_DIR = '../.' 18 SERVER_URL = 'http://localhost:8080/' 19 GET = '_ah/login?email=googler (at] google.com&action=Login&continue=%s' 20 AUTH_URL = SERVER_URL + GET 21 22 23 class ServerTest(unittest.TestCase): 24 """A unit test for the bartlett server. Tests upload, serve, and delete.""" 25 26 def setUp(self): 27 """Instantiate the files and server needed to test upload functionality.""" 28 self._server_proc = LaunchLocalServer() 29 self._jar = cookielib.LWPCookieJar() 30 self.opener = urllib2.build_opener(urllib2.HTTPCookieProcessor(self._jar)) 31 32 # We need these files to not delete when closed, because we have to reopen 33 # them in read mode after we write and close them. 34 self.profile_data = tempfile.NamedTemporaryFile(delete=False) 35 36 size = 16 * 1024 37 self.profile_data.write(os.urandom(size)) 38 39 def tearDown(self): 40 self.profile_data.close() 41 os.remove(self.profile_data.name) 42 os.kill(self._server_proc.pid, signal.SIGINT) 43 44 def testIntegration(self): # pylint: disable-msg=C6409 45 key = self._testUpload() 46 self._testListAll() 47 self._testServeKey(key) 48 self._testDelKey(key) 49 50 def _testUpload(self): # pylint: disable-msg=C6409 51 register_openers() 52 data = {'profile_data': self.profile_data, 53 'board': 'x86-zgb', 54 'chromeos_version': '2409.0.2012_06_08_1114'} 55 datagen, headers = multipart_encode(data) 56 request = urllib2.Request(SERVER_URL + 'upload', datagen, headers) 57 response = urllib2.urlopen(request).read() 58 self.assertTrue(response) 59 return response 60 61 def _testListAll(self): # pylint: disable-msg=C6409 62 request = urllib2.Request(AUTH_URL % (SERVER_URL + 'serve')) 63 response = self.opener.open(request).read() 64 self.assertTrue(response) 65 66 def _testServeKey(self, key): # pylint: disable-msg=C6409 67 request = urllib2.Request(AUTH_URL % (SERVER_URL + 'serve/' + key)) 68 response = self.opener.open(request).read() 69 self.assertTrue(response) 70 71 def _testDelKey(self, key): # pylint: disable-msg=C6409 72 # There is no response to a delete request. 73 # We will check the listAll page to ensure there is no data. 74 request = urllib2.Request(AUTH_URL % (SERVER_URL + 'del/' + key)) 75 response = self.opener.open(request).read() 76 request = urllib2.Request(AUTH_URL % (SERVER_URL + 'serve')) 77 response = self.opener.open(request).read() 78 self.assertFalse(response) 79 80 81 def LaunchLocalServer(): 82 """Launch and store an authentication cookie with a local server.""" 83 proc = subprocess.Popen( 84 ['dev_appserver.py', '--clear_datastore', SERVER_DIR], 85 stdout=subprocess.PIPE, 86 stderr=subprocess.PIPE) 87 # Wait for server to come up 88 while True: 89 time.sleep(1) 90 try: 91 request = urllib2.Request(SERVER_URL + 'serve') 92 response = urllib2.urlopen(request).read() 93 if response: 94 break 95 except urllib2.URLError: 96 continue 97 return proc 98 99 100 if __name__ == '__main__': 101 unittest.main() 102