Home | History | Annotate | Download | only in system_tests
      1 #/usr/bin/env python3.4
      2 #
      3 # Copyright (C) 2016 The Android Open Source Project
      4 #
      5 # Licensed under the Apache License, Version 2.0 (the "License"); you may not
      6 # use this file except in compliance with the License. You may obtain a copy of
      7 # the License at
      8 #
      9 # http://www.apache.org/licenses/LICENSE-2.0
     10 #
     11 # Unless required by applicable law or agreed to in writing, software
     12 # distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
     13 # WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
     14 # License for the specific language governing permissions and limitations under
     15 # the License.
     16 """
     17 Test script to execute Bluetooth basic functionality test cases.
     18 This test was designed to be run in a shield box.
     19 """
     20 
     21 import threading
     22 import time
     23 from contextlib import suppress
     24 from random import randint
     25 
     26 from queue import Empty
     27 from acts.test_utils.bt.BluetoothBaseTest import BluetoothBaseTest
     28 from acts.test_utils.bt.bt_test_utils import reset_bluetooth
     29 from acts.test_utils.bt.BluetoothBaseTest import BluetoothBaseTest
     30 from acts.test_utils.bt.bt_test_utils import rfcomm_accept
     31 from acts.test_utils.bt.bt_test_utils import rfcomm_connect
     32 from acts.test_utils.bt.bt_test_utils import take_btsnoop_logs
     33 from acts.test_utils.bt.bt_test_utils import write_read_verify_data
     34 
     35 class RfcommLongevityTest(BluetoothBaseTest):
     36     default_timeout = 10
     37     longev_iterations = 200
     38     thread_list = []
     39     generic_message = (
     40         "Space: the final frontier. These are the voyages of "
     41         "the starship Enterprise. Its continuing mission: to explore "
     42         "strange new worlds, to seek out new life and new civilizations,"
     43         " to boldly go where no man has gone before.")
     44 
     45     def __init__(self, controllers):
     46         BluetoothBaseTest.__init__(self, controllers)
     47         self.client_ad = self.android_devices[0]
     48         self.server_ad = self.android_devices[1]
     49 
     50     def on_fail(self, test_name, begin_time):
     51         take_btsnoop_logs(self.android_devices, self, test_name)
     52         for _ in range(5):
     53             if reset_bluetooth(self.android_devices):
     54                 break
     55             else:
     56                 self.log.info("Failed to reset bluetooth state, retrying...")
     57 
     58     def teardown_test(self):
     59         with suppress(Exception):
     60             for thread in self.thread_list:
     61                 thread.join()
     62         for _ in range(5):
     63             if reset_bluetooth(self.android_devices):
     64                 break
     65             else:
     66                 self.log.info("Failed to reset bluetooth state, retrying...")
     67         time.sleep(20) #safeguard in case of connId errors
     68 
     69     def orchestrate_rfcomm_connect(self, server_mac):
     70         accept_thread = threading.Thread(target=rfcomm_accept,
     71                                          args=(self.server_ad, ))
     72         self.thread_list.append(accept_thread)
     73         accept_thread.start()
     74         connect_thread = threading.Thread(
     75             target=rfcomm_connect,
     76             args=(self.client_ad, server_mac))
     77         self.thread_list.append(connect_thread)
     78         connect_thread.start()
     79 
     80     def test_rfcomm_longev_read_write_message(self):
     81         """Longevity test an RFCOMM connection's I/O with a generic message
     82 
     83         Test the longevity of RFCOMM with a basic read/write
     84         connect/disconnect sequence.
     85 
     86         Steps:
     87         1. Establish a bonding between two Android devices.
     88         2. Write data to RFCOMM from the client droid.
     89         3. Read data from RFCOMM from the server droid.
     90         4. Verify data written matches data read.
     91         5. Repeat steps 2-4 5000 times.
     92         6. Disconnect RFCOMM connection.
     93         7. Repeat steps 1-6 1000 times.
     94 
     95         Expected Result:
     96         Each iteration should read and write to the RFCOMM connection
     97         successfully. Each connect and disconnect should be successful.
     98 
     99         Returns:
    100           Pass if True
    101           Fail if False
    102 
    103         TAGS: Classic, Longevity, RFCOMM
    104         Priority: 2
    105         """
    106         server_mac = self.server_ad.droid.bluetoothGetLocalAddress()
    107         write_iterations = 5000
    108         for i in range(self.longev_iterations):
    109             self.log.info("iteration {} connection".format(i))
    110             self.orchestrate_rfcomm_connect(server_mac)
    111             for n in range(write_iterations):
    112                 self.log.info("iteration {} data".format(((n + 1) + (
    113                     i * write_iterations))))
    114                 if not write_read_verify_data(self.client_ad, self.server_ad,
    115                     self.generic_message, False):
    116                     return False
    117                 self.log.info("Iteration {} completed".format(n))
    118             self.client_ad.droid.bluetoothRfcommStop()
    119             self.server_ad.droid.bluetoothRfcommStop()
    120         for t in self.thread_list:
    121             t.join()
    122         self.thread_list.clear()
    123         return True
    124 
    125     def test_rfcomm_longev_read_write_small_message(self):
    126         """Longevity test an RFCOMM connection's I/O with a small message
    127 
    128         Test the longevity of RFCOMM with a basic read/write
    129         connect/disconnect sequence. The data being transfered is only
    130         one character in size.
    131 
    132         Steps:
    133         1. Establish a bonding between two Android devices.
    134         2. Write data to RFCOMM from the client droid.
    135         3. Read data from RFCOMM from the server droid.
    136         4. Verify data written matches data read.
    137         5. Repeat steps 2-4 5000 times.
    138         6. Disconnect RFCOMM connection.
    139         7. Repeat steps 1-6 1000 times.
    140 
    141         Expected Result:
    142         Each iteration should read and write to the RFCOMM connection
    143         successfully. Each connect and disconnect should be successful.
    144 
    145         Returns:
    146           Pass if True
    147           Fail if False
    148 
    149         TAGS: Classic, Longevity, RFCOMM
    150         Priority: 2
    151         """
    152         server_mac = self.server_ad.droid.bluetoothGetLocalAddress()
    153         message = "x"
    154         write_iterations = 5000
    155         for i in range(self.longev_iterations):
    156             self.log.info("iteration {} connection".format(i))
    157             self.orchestrate_rfcomm_connect(server_mac)
    158             for n in range(write_iterations):
    159                 self.log.info("iteration {} data".format(((n + 1) + (
    160                     i * write_iterations))))
    161                 if not write_read_verify_data(self.client_ad, self.server_ad,
    162                     message, False):
    163                     return False
    164                 self.log.info("Iteration {} completed".format(n))
    165             self.client_ad.droid.bluetoothRfcommStop()
    166             self.server_ad.droid.bluetoothRfcommStop()
    167         for t in self.thread_list:
    168             t.join()
    169         self.thread_list.clear()
    170         return True
    171 
    172     def test_rfcomm_longev_read_write_binary_message(self):
    173         """Longevity test an RFCOMM connection's I/O with a binary message
    174 
    175         Test the longevity of RFCOMM with a basic read/write
    176         connect/disconnect sequence. The data being transfered is in a
    177         binary format.
    178 
    179         Steps:
    180         1. Establish a bonding between two Android devices.
    181         2. Write data to RFCOMM from the client droid.
    182         3. Read data from RFCOMM from the server droid.
    183         4. Verify data written matches data read.
    184         5. Repeat steps 2-4 5000 times.
    185         6. Disconnect RFCOMM connection.
    186         7. Repeat steps 1-6 1000 times.
    187 
    188         Expected Result:
    189         Each iteration should read and write to the RFCOMM connection
    190         successfully. Each connect and disconnect should be successful.
    191 
    192         Returns:
    193           Pass if True
    194           Fail if False
    195 
    196         TAGS: Classic, Longevity, RFCOMM
    197         Priority: 2
    198         """
    199         server_mac = self.server_ad.droid.bluetoothGetLocalAddress()
    200         binary_message = "11010101"
    201         write_iterations = 5000
    202         for i in range(self.longev_iterations):
    203             self.log.info("iteration {} connection".format(i))
    204             self.orchestrate_rfcomm_connect(server_mac)
    205             for n in range(write_iterations):
    206                 self.log.info("iteration {} data".format(((n + 1) + (
    207                     i * write_iterations))))
    208                 if not write_read_verify_data(self.client_ad, self.server_ad,
    209                     binary_message, True):
    210                     return False
    211                 self.log.info("Iteration {} completed".format(n))
    212             self.client_ad.droid.bluetoothRfcommStop()
    213             self.server_ad.droid.bluetoothRfcommStop()
    214         for t in self.thread_list:
    215             t.join()
    216         self.thread_list.clear()
    217         return True
    218 
    219     def test_rfcomm_longev_read_write_large_message(self):
    220         """Longevity test an RFCOMM connection's I/O with a large message
    221 
    222         Test the longevity of RFCOMM with a basic read/write
    223         connect/disconnect sequence. The data being transfered is 990 chars
    224         in size.
    225 
    226         Steps:
    227         1. Establish a bonding between two Android devices.
    228         2. Write data to RFCOMM from the client droid.
    229         3. Read data from RFCOMM from the server droid.
    230         4. Verify data written matches data read.
    231         5. Repeat steps 2-4 5000 times.
    232         6. Disconnect RFCOMM connection.
    233         7. Repeat steps 1-6 1000 times.
    234 
    235         Expected Result:
    236         Each iteration should read and write to the RFCOMM connection
    237         successfully. Each connect and disconnect should be successful.
    238 
    239         Returns:
    240           Pass if True
    241           Fail if False
    242 
    243         TAGS: Classic, Longevity, RFCOMM
    244         Priority: 2
    245         """
    246         server_mac = self.server_ad.droid.bluetoothGetLocalAddress()
    247         message = "x" * 990  #largest message size till sl4a fixed
    248         write_iterations = 5000
    249         for i in range(self.longev_iterations):
    250             self.log.info("iteration {} connection".format(i))
    251             self.orchestrate_rfcomm_connect(server_mac)
    252             for n in range(write_iterations):
    253                 self.log.info("iteration {} data".format(((n + 1) + (
    254                     i * write_iterations))))
    255                 if not write_read_verify_data(self.client_ad, self.server_ad,
    256                     message, False):
    257                     return False
    258                 self.log.info("Iteration {} completed".format(n))
    259             self.client_ad.droid.bluetoothRfcommStop()
    260             self.server_ad.droid.bluetoothRfcommStop()
    261         for t in self.thread_list:
    262             t.join()
    263         self.thread_list.clear()
    264         return True
    265 
    266     def test_rfcomm_longev_connection_interuption(self):
    267         """Longevity test an RFCOMM connection's with socket interuptions
    268 
    269         Test the longevity of RFCOMM with a basic read/write
    270         connect/disconnect sequence. Randomly in the sequence of reads and
    271         writes the socket on the client side will close. There should be
    272         an exception thrown for writing the next set of data and the
    273         test should start up a new connection and continue.
    274 
    275         Steps:
    276         1. Establish a bonding between two Android devices.
    277         2. Write data to RFCOMM from the client droid.
    278         3. Read data from RFCOMM from the server droid.
    279         4. Verify data written matches data read.
    280         5. Repeat steps 2-4 5000 times or until the random interupt occurs.
    281         6. Re-establish an RFCOMM connection.
    282         7. Repeat steps 1-6 1000 times.
    283 
    284         Expected Result:
    285         Each iteration should read and write to the RFCOMM connection
    286         successfully. Each connect and disconnect should be successful.
    287         Devices should recover a new connection after each interruption.
    288 
    289         Returns:
    290           Pass if True
    291           Fail if False
    292 
    293         TAGS: Classic, Longevity, RFCOMM
    294         Priority: 2
    295         """
    296         server_mac = self.server_ad.droid.bluetoothGetLocalAddress()
    297         write_iterations = 5000
    298         for i in range(self.longev_iterations):
    299             try:
    300                 self.log.info("iteration {} connection".format(i))
    301                 self.orchestrate_rfcomm_connect(server_mac)
    302                 random_interup_iteration = randint(0, write_iterations)
    303                 for n in range(write_iterations):
    304                     self.log.info("iteration {} data".format(((n + 1) + (
    305                         i * write_iterations))))
    306                     if not write_read_verify_data(self.client_ad, self.server_ad,
    307                         self.generic_message, False):
    308                         return False
    309                     self.log.info("Iteration {} completed".format(n))
    310                     if n > random_interup_iteration:
    311                         self.client_ad.droid.bluetoothRfcommCloseSocket()
    312                 self.client_ad.droid.bluetoothRfcommStop()
    313                 self.server_ad.droid.bluetoothRfcommStop()
    314             except Exception:
    315                 self.log.info("Exception found as expected. Continuing...")
    316                 try:
    317                     self.client_ad.droid.bluetoothRfcommStop()
    318                 except Exception as err:
    319                     self.log.error(
    320                         "Error closing client connection: {}".format(err))
    321                     return False
    322                 try:
    323                     self.server_ad.droid.bluetoothRfcommStop()
    324                 except Exception as err:
    325                     self.log.error(
    326                         "Error closing server connection: {}".format(err))
    327                     return False
    328                 for t in self.thread_list:
    329                     t.join()
    330                 self.thread_list.clear()
    331         for t in self.thread_list:
    332             t.join()
    333         self.thread_list.clear()
    334         return True
    335 
    336     def test_rfcomm_longev_data_elasticity(self):
    337         """Longevity test an RFCOMM connection's I/O with changing data size
    338 
    339         Test the longevity of RFCOMM with a basic read/write
    340         connect/disconnect sequence. The data being transfered changes
    341         in size after each write/read sequence to increase up to 990
    342         chars in size and decrease down to 1 in size. This repeats through
    343         the entire test in order to exercise different size values being
    344         written.
    345 
    346         Steps:
    347         1. Establish a bonding between two Android devices.
    348         2. Write data to RFCOMM from the client droid.
    349         3. Read data from RFCOMM from the server droid.
    350         4. Verify data written matches data read.
    351         5. Change data size according to above description.
    352         6. Repeat steps 2-5 5000 times.
    353         7. Disconnect RFCOMM connection.
    354         8. Repeat steps 1-6 1000 times.
    355 
    356         Expected Result:
    357         Each iteration should read and write to the RFCOMM connection
    358         successfully. Each connect and disconnect should be successful.
    359 
    360         Returns:
    361           Pass if True
    362           Fail if False
    363 
    364         TAGS: Classic, Longevity, RFCOMM
    365         Priority: 2
    366         """
    367         server_mac = self.server_ad.droid.bluetoothGetLocalAddress()
    368         message = "x"
    369         resize_toggle = 1
    370         write_iterations = 5000
    371         for i in range(self.longev_iterations):
    372             try:
    373                 self.log.info("iteration {} connection".format(i))
    374                 self.orchestrate_rfcomm_connect(server_mac)
    375                 for n in range(write_iterations):
    376                     self.log.info("iteration {} data".format(((n + 1) + (
    377                         i * write_iterations))))
    378                     if not write_read_verify_data(self.client_ad, self.server_ad,
    379                         message, False):
    380                         return False
    381                     self.log.info("Iteration {} completed".format(n))
    382                     size_of_message = len(message)
    383                     #max size is 990 due to a bug in sl4a.
    384                     if size_of_message >= 990:
    385                         resize_toggle = 0
    386                     elif size_of_message <= 1:
    387                         resize_toggle = 1
    388                     if resize_toggle == 0:
    389                         message = "x" * (size_of_message - 1)
    390                     else:
    391                         message = "x" * (size_of_message + 1)
    392                 self.client_ad.droid.bluetoothRfcommStop()
    393                 self.server_ad.droid.bluetoothRfcommStop()
    394             except Exception as err:
    395                 self.log.info("Error in longevity test: {}".format(err))
    396                 for t in self.thread_list:
    397                     t.join()
    398                 self.thread_list.clear()
    399                 return False
    400 
    401         for t in self.thread_list:
    402             t.join()
    403         self.thread_list.clear()
    404         return True
    405