1 # Copyright 2014-2015 ARM Limited 2 # 3 # Licensed under the Apache License, Version 2.0 (the "License"); 4 # you may not use this file except in compliance with the License. 5 # You may obtain a copy of the License at 6 # 7 # http://www.apache.org/licenses/LICENSE-2.0 8 # 9 # Unless required by applicable law or agreed to in writing, software 10 # distributed under the License is distributed on an "AS IS" BASIS, 11 # WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. 12 # See the License for the specific language governing permissions and 13 # limitations under the License. 14 # 15 16 17 import re 18 import time 19 import logging 20 from copy import copy 21 22 from devlib.utils.serial_port import write_characters, TIMEOUT 23 from devlib.utils.types import boolean 24 25 26 logger = logging.getLogger('UEFI') 27 28 29 class UefiConfig(object): 30 31 def __init__(self, config_dict): 32 if isinstance(config_dict, UefiConfig): 33 self.__dict__ = copy(config_dict.__dict__) 34 else: 35 try: 36 self.image_name = config_dict['image_name'] 37 self.image_args = config_dict['image_args'] 38 self.fdt_support = boolean(config_dict['fdt_support']) 39 except KeyError as e: 40 raise ValueError('Missing mandatory parameter for UEFI entry config: "{}"'.format(e)) 41 self.initrd = config_dict.get('initrd') 42 self.fdt_path = config_dict.get('fdt_path') 43 if self.fdt_path and not self.fdt_support: 44 raise ValueError('FDT path has been specfied for UEFI entry, when FDT support is "False"') 45 46 47 class UefiMenu(object): 48 """ 49 Allows navigating UEFI menu over serial (it relies on a pexpect connection). 50 51 """ 52 53 option_regex = re.compile(r'^\[(\d+)\]\s+([^\r]+)\r\n', re.M) 54 prompt_regex = re.compile(r'^(\S[^\r\n]+):\s*', re.M) 55 invalid_regex = re.compile(r'Invalid input \(max (\d+)\)', re.M) 56 57 load_delay = 1 # seconds 58 default_timeout = 60 # seconds 59 60 def __init__(self, conn, prompt='The default boot selection will start in'): 61 """ 62 :param conn: A serial connection as returned by ``pexect.spawn()``. 63 :param prompt: The starting prompt to wait for during ``open()``. 64 65 """ 66 self.conn = conn 67 self.start_prompt = prompt 68 self.options = {} 69 self.prompt = None 70 self.attempting_invalid_retry = False 71 72 def wait(self, timeout=default_timeout): 73 """ 74 "Open" the UEFI menu by sending an interrupt on STDIN after seeing the 75 starting prompt (configurable upon creation of the ``UefiMenu`` object. 76 77 """ 78 self.conn.expect(self.start_prompt, timeout) 79 self.connect() 80 81 def connect(self, timeout=default_timeout): 82 self.nudge() 83 time.sleep(self.load_delay) 84 self.read_menu(timeout=timeout) 85 86 def create_entry(self, name, config): 87 """Create a new UEFI entry using the parameters. The menu is assumed 88 to be at the top level. Upon return, the menu will be at the top level.""" 89 logger.debug('Creating UEFI entry {}'.format(name)) 90 self.nudge() 91 self.select('Boot Manager') 92 self.select('Add Boot Device Entry') 93 self.select('NOR Flash') 94 self.enter(config.image_name) 95 self.enter('y' if config.fdt_support else 'n') 96 if config.initrd: 97 self.enter('y') 98 self.enter(config.initrd) 99 else: 100 self.enter('n') 101 self.enter(config.image_args) 102 self.enter(name) 103 104 if config.fdt_path: 105 self.select('Update FDT path') 106 self.enter(config.fdt_path) 107 108 self.select('Return to main menu') 109 110 def delete_entry(self, name): 111 """Delete the specified UEFI entry. The menu is assumed 112 to be at the top level. Upon return, the menu will be at the top level.""" 113 logger.debug('Removing UEFI entry {}'.format(name)) 114 self.nudge() 115 self.select('Boot Manager') 116 self.select('Remove Boot Device Entry') 117 self.select(name) 118 self.select('Return to main menu') 119 120 def select(self, option, timeout=default_timeout): 121 """ 122 Select the specified option from the current menu. 123 124 :param option: Could be an ``int`` index of the option, or a string/regex to 125 match option text against. 126 :param timeout: If a non-``int`` option is specified, the option list may need 127 need to be parsed (if it hasn't been already), this may block 128 and the timeout is used to cap that , resulting in a ``TIMEOUT`` 129 exception. 130 :param delay: A fixed delay to wait after sending the input to the serial connection. 131 This should be set if input this action is known to result in a 132 long-running operation. 133 134 """ 135 if isinstance(option, basestring): 136 option = self.get_option_index(option, timeout) 137 self.enter(option) 138 139 def enter(self, value, delay=load_delay): 140 """Like ``select()`` except no resolution is performed -- the value is sent directly 141 to the serial connection.""" 142 # Empty the buffer first, so that only response to the input about to 143 # be sent will be processed by subsequent commands. 144 value = str(value) 145 self._reset() 146 write_characters(self.conn, value) 147 # TODO: in case the value is long an complicated, things may get 148 # screwed up (e.g. there may be line breaks injected), additionally, 149 # special chars might cause regex to fail. To avoid these issues i'm 150 # only matching against the first 5 chars of the value. This is 151 # entirely arbitrary and I'll probably have to find a better way of 152 # doing this at some point. 153 self.conn.expect(value[:5], timeout=delay) 154 time.sleep(self.load_delay) 155 156 def read_menu(self, timeout=default_timeout): 157 """Parse serial output to get the menu options and the following prompt.""" 158 attempting_timeout_retry = False 159 self.attempting_invalid_retry = False 160 while True: 161 index = self.conn.expect([self.option_regex, self.prompt_regex, self.invalid_regex, TIMEOUT], 162 timeout=timeout) 163 match = self.conn.match 164 if index == 0: # matched menu option 165 self.options[match.group(1)] = match.group(2) 166 elif index == 1: # matched prompt 167 self.prompt = match.group(1) 168 break 169 elif index == 2: # matched invalid selection 170 # We've sent an invalid input (which includes an empty line) at 171 # the top-level menu. To get back the menu options, it seems we 172 # need to enter what the error reports as the max + 1, so... 173 if not self.attempting_invalid_retry: 174 self.attempting_invalid_retry = True 175 val = int(match.group(1)) 176 self.empty_buffer() 177 self.enter(val) 178 self.select('Return to main menu') 179 self.attempting_invalid_retry = False 180 else: # OK, that didn't work; panic! 181 raise RuntimeError('Could not read menu entries stuck on "{}" prompt'.format(self.prompt)) 182 elif index == 3: # timed out 183 if not attempting_timeout_retry: 184 attempting_timeout_retry = True 185 self.nudge() 186 else: # Didn't help. Run away! 187 raise RuntimeError('Did not see a valid UEFI menu.') 188 else: 189 raise AssertionError('Unexpected response waiting for UEFI menu') # should never get here 190 191 def list_options(self, timeout=default_timeout): 192 """Returns the menu index of the specified option text (uses regex matching). If the option 193 is not in the current menu, ``LookupError`` will be raised.""" 194 if not self.prompt: 195 self.read_menu(timeout) 196 return self.options.items() 197 198 def get_option_index(self, text, timeout=default_timeout): 199 """Returns the menu index of the specified option text (uses regex matching). If the option 200 is not in the current menu, ``LookupError`` will be raised.""" 201 if not self.prompt: 202 self.read_menu(timeout) 203 for k, v in self.options.iteritems(): 204 if re.search(text, v): 205 return k 206 raise LookupError(text) 207 208 def has_option(self, text, timeout=default_timeout): 209 """Returns ``True`` if at least one of the options in the current menu has 210 matched (using regex) the specified text.""" 211 try: 212 self.get_option_index(text, timeout) 213 return True 214 except LookupError: 215 return False 216 217 def nudge(self): 218 """Send a little nudge to ensure there is something to read. This is useful when you're not 219 sure if all out put from the serial has been read already.""" 220 self.enter('') 221 222 def empty_buffer(self): 223 """Read everything from the serial and clear the internal pexpect buffer. This ensures 224 that the next ``expect()`` call will time out (unless further input will be sent to the 225 serial beforehand. This is used to create a "known" state and avoid unexpected matches.""" 226 try: 227 while True: 228 time.sleep(0.1) 229 self.conn.read_nonblocking(size=1024, timeout=0.1) 230 except TIMEOUT: 231 pass 232 self.conn.buffer = '' 233 234 def _reset(self): 235 self.options = {} 236 self.prompt = None 237 self.empty_buffer() 238 239 240