Home | History | Annotate | Download | only in utils
      1 #    Copyright 2014-2015 ARM Limited
      2 #
      3 # Licensed under the Apache License, Version 2.0 (the "License");
      4 # you may not use this file except in compliance with the License.
      5 # You may obtain a copy of the License at
      6 #
      7 #     http://www.apache.org/licenses/LICENSE-2.0
      8 #
      9 # Unless required by applicable law or agreed to in writing, software
     10 # distributed under the License is distributed on an "AS IS" BASIS,
     11 # WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
     12 # See the License for the specific language governing permissions and
     13 # limitations under the License.
     14 #
     15 
     16 
     17 import re
     18 import time
     19 import logging
     20 from copy import copy
     21 
     22 from devlib.utils.serial_port import write_characters, TIMEOUT
     23 from devlib.utils.types import boolean
     24 
     25 
     26 logger = logging.getLogger('UEFI')
     27 
     28 
     29 class UefiConfig(object):
     30 
     31     def __init__(self, config_dict):
     32         if isinstance(config_dict, UefiConfig):
     33             self.__dict__ = copy(config_dict.__dict__)
     34         else:
     35             try:
     36                 self.image_name = config_dict['image_name']
     37                 self.image_args = config_dict['image_args']
     38                 self.fdt_support = boolean(config_dict['fdt_support'])
     39             except KeyError as e:
     40                 raise ValueError('Missing mandatory parameter for UEFI entry config: "{}"'.format(e))
     41             self.initrd = config_dict.get('initrd')
     42             self.fdt_path = config_dict.get('fdt_path')
     43             if self.fdt_path and not self.fdt_support:
     44                 raise ValueError('FDT path has been specfied for UEFI entry, when FDT support is "False"')
     45 
     46 
     47 class UefiMenu(object):
     48     """
     49     Allows navigating UEFI menu over serial (it relies on a pexpect connection).
     50 
     51     """
     52 
     53     option_regex = re.compile(r'^\[(\d+)\]\s+([^\r]+)\r\n', re.M)
     54     prompt_regex = re.compile(r'^(\S[^\r\n]+):\s*', re.M)
     55     invalid_regex = re.compile(r'Invalid input \(max (\d+)\)', re.M)
     56 
     57     load_delay = 1  # seconds
     58     default_timeout = 60  # seconds
     59 
     60     def __init__(self, conn, prompt='The default boot selection will start in'):
     61         """
     62         :param conn: A serial connection as returned by ``pexect.spawn()``.
     63         :param prompt: The starting prompt to wait for during ``open()``.
     64 
     65         """
     66         self.conn = conn
     67         self.start_prompt = prompt
     68         self.options = {}
     69         self.prompt = None
     70         self.attempting_invalid_retry = False
     71 
     72     def wait(self, timeout=default_timeout):
     73         """
     74         "Open" the UEFI menu by sending an interrupt on STDIN after seeing the
     75         starting prompt (configurable upon creation of the ``UefiMenu`` object.
     76 
     77         """
     78         self.conn.expect(self.start_prompt, timeout)
     79         self.connect()
     80 
     81     def connect(self, timeout=default_timeout):
     82         self.nudge()
     83         time.sleep(self.load_delay)
     84         self.read_menu(timeout=timeout)
     85 
     86     def create_entry(self, name, config):
     87         """Create a new UEFI entry using the parameters. The menu is assumed
     88         to be at the top level. Upon return, the menu will be at the top level."""
     89         logger.debug('Creating UEFI entry {}'.format(name))
     90         self.nudge()
     91         self.select('Boot Manager')
     92         self.select('Add Boot Device Entry')
     93         self.select('NOR Flash')
     94         self.enter(config.image_name)
     95         self.enter('y' if config.fdt_support else 'n')
     96         if config.initrd:
     97             self.enter('y')
     98             self.enter(config.initrd)
     99         else:
    100             self.enter('n')
    101         self.enter(config.image_args)
    102         self.enter(name)
    103 
    104         if config.fdt_path:
    105             self.select('Update FDT path')
    106             self.enter(config.fdt_path)
    107 
    108         self.select('Return to main menu')
    109 
    110     def delete_entry(self, name):
    111         """Delete the specified UEFI entry. The menu is assumed
    112         to be at the top level. Upon return, the menu will be at the top level."""
    113         logger.debug('Removing UEFI entry {}'.format(name))
    114         self.nudge()
    115         self.select('Boot Manager')
    116         self.select('Remove Boot Device Entry')
    117         self.select(name)
    118         self.select('Return to main menu')
    119 
    120     def select(self, option, timeout=default_timeout):
    121         """
    122         Select the specified option from the current menu.
    123 
    124         :param option: Could be an ``int`` index of the option, or a string/regex to
    125                        match option text against.
    126         :param timeout: If a non-``int`` option is specified, the option list may need
    127                         need to be parsed (if it hasn't been already), this may block
    128                         and the timeout is used to cap that , resulting in a ``TIMEOUT``
    129                         exception.
    130         :param delay: A fixed delay to wait after sending the input to the serial connection.
    131                       This should be set if input this action is known to result in a
    132                       long-running operation.
    133 
    134         """
    135         if isinstance(option, basestring):
    136             option = self.get_option_index(option, timeout)
    137         self.enter(option)
    138 
    139     def enter(self, value, delay=load_delay):
    140         """Like ``select()`` except no resolution is performed -- the value is sent directly
    141         to the serial connection."""
    142         # Empty the buffer first, so that only response to the input about to
    143         # be sent will be processed by subsequent commands.
    144         value = str(value)
    145         self._reset()
    146         write_characters(self.conn, value)
    147         # TODO: in case the value is long an complicated, things may get
    148         # screwed up (e.g. there may be line breaks injected), additionally,
    149         # special chars might cause regex to fail. To avoid these issues i'm
    150         # only matching against the first 5 chars of the value. This is
    151         # entirely arbitrary and I'll probably have to find a better way of
    152         # doing this at some point.
    153         self.conn.expect(value[:5], timeout=delay)
    154         time.sleep(self.load_delay)
    155 
    156     def read_menu(self, timeout=default_timeout):
    157         """Parse serial output to get the menu options and the following prompt."""
    158         attempting_timeout_retry = False
    159         self.attempting_invalid_retry = False
    160         while True:
    161             index = self.conn.expect([self.option_regex, self.prompt_regex, self.invalid_regex, TIMEOUT],
    162                                      timeout=timeout)
    163             match = self.conn.match
    164             if index == 0:  # matched menu option
    165                 self.options[match.group(1)] = match.group(2)
    166             elif index == 1:  # matched prompt
    167                 self.prompt = match.group(1)
    168                 break
    169             elif index == 2:  # matched invalid selection
    170                 # We've sent an invalid input (which includes an empty line) at
    171                 # the top-level menu. To get back the menu options, it seems we
    172                 # need to enter what the error reports as the max + 1, so...
    173                 if not self.attempting_invalid_retry:
    174                     self.attempting_invalid_retry = True
    175                     val = int(match.group(1))
    176                     self.empty_buffer()
    177                     self.enter(val)
    178                     self.select('Return to main menu')
    179                     self.attempting_invalid_retry = False
    180                 else:   # OK, that didn't work; panic!
    181                     raise RuntimeError('Could not read menu entries stuck on "{}" prompt'.format(self.prompt))
    182             elif index == 3:  # timed out
    183                 if not attempting_timeout_retry:
    184                     attempting_timeout_retry = True
    185                     self.nudge()
    186                 else:  # Didn't help. Run away!
    187                     raise RuntimeError('Did not see a valid UEFI menu.')
    188             else:
    189                 raise AssertionError('Unexpected response waiting for UEFI menu')  # should never get here
    190 
    191     def list_options(self, timeout=default_timeout):
    192         """Returns the menu index of the specified option text (uses regex matching). If the option
    193         is not in the current menu, ``LookupError`` will be raised."""
    194         if not self.prompt:
    195             self.read_menu(timeout)
    196         return self.options.items()
    197 
    198     def get_option_index(self, text, timeout=default_timeout):
    199         """Returns the menu index of the specified option text (uses regex matching). If the option
    200         is not in the current menu, ``LookupError`` will be raised."""
    201         if not self.prompt:
    202             self.read_menu(timeout)
    203         for k, v in self.options.iteritems():
    204             if re.search(text, v):
    205                 return k
    206         raise LookupError(text)
    207 
    208     def has_option(self, text, timeout=default_timeout):
    209         """Returns ``True`` if at least one of the options in the current menu has
    210         matched (using regex) the specified text."""
    211         try:
    212             self.get_option_index(text, timeout)
    213             return True
    214         except LookupError:
    215             return False
    216 
    217     def nudge(self):
    218         """Send a little nudge to ensure there is something to read. This is useful when you're not
    219         sure if all out put from the serial has been read already."""
    220         self.enter('')
    221 
    222     def empty_buffer(self):
    223         """Read everything from the serial and clear the internal pexpect buffer. This ensures
    224         that the next ``expect()`` call will time out (unless further input will be sent to the
    225         serial beforehand. This is used to create a "known" state and avoid unexpected matches."""
    226         try:
    227             while True:
    228                 time.sleep(0.1)
    229                 self.conn.read_nonblocking(size=1024, timeout=0.1)
    230         except TIMEOUT:
    231             pass
    232         self.conn.buffer = ''
    233 
    234     def _reset(self):
    235         self.options = {}
    236         self.prompt = None
    237         self.empty_buffer()
    238 
    239 
    240