1 # Copyright (c) 2012 The Chromium OS Authors. All rights reserved. 2 # Use of this source code is governed by a BSD-style license that can be 3 # found in the LICENSE file. 4 5 """This module provides MTB parser and related packet methods.""" 6 7 import copy 8 import logging 9 import math 10 import os 11 import re 12 import sys 13 14 from collections import defaultdict, namedtuple, OrderedDict 15 16 from firmware_constants import AXIS, GV, MTB, UNIT, VAL 17 from geometry.elements import Point 18 from geometry.two_farthest_clusters import ( 19 get_radii_of_two_minimal_enclosing_circles as get_two_min_radii, 20 get_two_farthest_points 21 ) 22 sys.path.append('../../bin/input') 23 from linux_input import * 24 25 26 # Define TidPacket to keep the point, pressure, and SYN_REPOT time of a packet. 27 TidPacket = namedtuple('TidPacket', ['syn_time', 'point', 'pressure']) 28 29 30 # Define FingerPath class to keep track of the slot, and a list of tid packets 31 # of a finger (i.e., a tracking ID). 32 class FingerPath(namedtuple('FingerPath', ['slot', 'tid_packets'])): 33 """This keeps the slot number and the list of tid packets of a finger.""" 34 __slots__ = () 35 36 def get(self, attr): 37 """Get the list of the specified attribute, attr (i.e., point, 38 pressure, or syn_time), of TidPacket for the finger. 39 """ 40 return [getattr(tid_packet, attr) for tid_packet in self.tid_packets] 41 42 43 def get_mtb_packets_from_file(event_file): 44 """ A helper function to get mtb packets by parsing the event file. 45 46 @param event_file: an mtb_event file 47 """ 48 return Mtb(packets=MtbParser().parse_file(event_file)) 49 50 51 def make_pretty_packet(packet): 52 """Convert the event list in a packet to a pretty format.""" 53 pretty_packet = [] 54 for event in packet: 55 pretty_event = [] 56 pretty_event.append('Event:') 57 pretty_event.append('time %.6f,' % event[MTB.EV_TIME]) 58 if event.get(MTB.SYN_REPORT): 59 pretty_event.append('-------------- SYN_REPORT ------------\n') 60 else: 61 ev_type = event[MTB.EV_TYPE] 62 pretty_event.append('type %d (%s),' % (ev_type, EV_TYPES[ev_type])) 63 ev_code = event[MTB.EV_CODE] 64 pretty_event.append('code %d (%s),' % 65 (ev_code, EV_STRINGS[ev_type][ev_code])) 66 pretty_event.append('value %d' % event[MTB.EV_VALUE]) 67 pretty_packet.append(' '.join(pretty_event)) 68 return '\n'.join(pretty_packet) 69 70 71 def convert_to_evemu_format(packets): 72 """Convert the text event format to the evemu format.""" 73 evemu_output = [] 74 evemu_format = 'E: %.6f %04x %04x %d' 75 evemu_format_syn_report = 'E: %.6f 0000 0000 0' 76 for packet in packets: 77 for event in packet: 78 if event.get(MTB.SYN_REPORT): 79 evemu_event = evemu_format_syn_report % event[MTB.EV_TIME] 80 else: 81 evemu_event = evemu_format % (event[MTB.EV_TIME], 82 event[MTB.EV_TYPE], 83 event[MTB.EV_CODE], 84 event[MTB.EV_VALUE]) 85 evemu_output.append(evemu_event) 86 return evemu_output 87 88 89 def convert_mtplot_file_to_evemu_file(mtplot_file, evemu_dir=None, 90 evemu_ext='.evemu', force=False): 91 """Convert a mtplot event file to an evemu event file. 92 93 Example: 94 'one_finger_swipe.dat' is converted to 'one_finger_swipe.evemu.dat' 95 """ 96 if not os.path.isfile(mtplot_file): 97 print 'Error: there is no such file: "%s".' % mtplot_file 98 return None 99 100 # Convert mtplot event format to evemu event format. 101 mtplot_packets = MtbParser().parse_file(mtplot_file) 102 evemu_packets = convert_to_evemu_format(mtplot_packets) 103 104 # Create the evemu file from the mtplot file. 105 mtplot_dir, mtplot_filename = os.path.split(mtplot_file) 106 mtplot_basename, mtplot_ext = os.path.splitext(mtplot_filename) 107 108 evemu_dir = evemu_dir if evemu_dir else mtplot_dir 109 evemu_file = (os.path.join(evemu_dir, mtplot_basename) + evemu_ext + 110 mtplot_ext) 111 112 # Make sure that the file to be created does not exist yet unless force flag 113 # is set to be True. 114 if os.path.isfile(evemu_file) and not force: 115 print 'Warning: the "%s" already exists. Quit.' % evemu_file 116 return None 117 118 # Write the converted evemu events to the evemu file. 119 try: 120 with open(evemu_file, 'w') as evemu_f: 121 evemu_f.write('\n'.join(evemu_packets)) 122 except Exception as e: 123 print 'Error: cannot write data to %s' % evemu_file 124 return None 125 126 return evemu_file 127 128 129 def create_final_state_packet(packets): 130 """Given a sequence of packets, generate a packet representing 131 the final state of events 132 """ 133 def try_to_add(packet, event): 134 """Try to add an event, if its value is not None, into the packet.""" 135 _, _, _, value = event 136 if value is not None: 137 packet.append(MtbParser.make_ev_dict(event)) 138 139 # Put the packets through a state machine to get the 140 # final state of events 141 sm = MtbStateMachine() 142 for packet in packets: 143 for event in packet: 144 sm.add_event(event) 145 sm.get_current_tid_data_for_all_tids() 146 147 # Create the dummy packets representing the final state. We use 148 # request_data_ready=False so that we still receive tid_packets 149 # even if not all the events are populated (e.g. if a pressure 150 # or position event is missing.) 151 final_state_packet = [] 152 153 # It is possible that all fingers have left at this time instant. 154 if sm.number_fingers == 0: 155 return final_state_packet 156 157 # Extract slot data from the snapshot of the state machine. 158 syn_time = None 159 for slot_data in sm.get_snapshot(): 160 syn_time, slot, tid, point, pressure = slot_data 161 try_to_add(final_state_packet, (syn_time, EV_ABS, ABS_MT_SLOT, slot)) 162 try_to_add(final_state_packet, 163 (syn_time, EV_ABS, ABS_MT_TRACKING_ID, tid)) 164 try_to_add(final_state_packet, 165 (syn_time, EV_ABS, ABS_MT_POSITION_X, point.x)) 166 try_to_add(final_state_packet, 167 (syn_time, EV_ABS, ABS_MT_POSITION_Y, point.y)) 168 try_to_add(final_state_packet, 169 (syn_time, EV_ABS, ABS_MT_PRESSURE, pressure)) 170 171 # Add syn_report event to indicate the end of the packet 172 if syn_time: 173 final_state_packet.append(MtbParser.make_syn_report_ev_dict(syn_time)) 174 return final_state_packet 175 176 177 class MtbEvent: 178 """Determine what an MTB event is. 179 180 This class is just a bundle of a variety of classmethods about 181 MTB event classification. 182 """ 183 @classmethod 184 def is_ABS_MT_TRACKING_ID(cls, event): 185 """Is this event ABS_MT_TRACKING_ID?""" 186 return (not event.get(MTB.SYN_REPORT) and 187 event[MTB.EV_TYPE] == EV_ABS and 188 event[MTB.EV_CODE] == ABS_MT_TRACKING_ID) 189 190 @classmethod 191 def is_new_contact(cls, event): 192 """Is this packet generating new contact (Tracking ID)?""" 193 return cls.is_ABS_MT_TRACKING_ID(event) and event[MTB.EV_VALUE] != -1 194 195 @classmethod 196 def is_finger_leaving(cls, event): 197 """Is the finger is leaving in this packet?""" 198 return cls.is_ABS_MT_TRACKING_ID(event) and event[MTB.EV_VALUE] == -1 199 200 @classmethod 201 def is_ABS_MT_SLOT(cls, event): 202 """Is this packet ABS_MT_SLOT?""" 203 return (not event.get(MTB.SYN_REPORT) and 204 event[MTB.EV_TYPE] == EV_ABS and 205 event[MTB.EV_CODE] == ABS_MT_SLOT) 206 207 @classmethod 208 def is_ABS_MT_POSITION_X(cls, event): 209 """Is this packet ABS_MT_POSITION_X?""" 210 return (not event.get(MTB.SYN_REPORT) and 211 event[MTB.EV_TYPE] == EV_ABS and 212 event[MTB.EV_CODE] == ABS_MT_POSITION_X) 213 214 @classmethod 215 def is_ABS_MT_POSITION_Y(cls, event): 216 """Is this packet ABS_MT_POSITION_Y?""" 217 return (not event.get(MTB.SYN_REPORT) and 218 event[MTB.EV_TYPE] == EV_ABS and 219 event[MTB.EV_CODE] == ABS_MT_POSITION_Y) 220 221 @classmethod 222 def is_ABS_MT_PRESSURE(self, event): 223 """Is this packet ABS_MT_PRESSURE?""" 224 return (not event.get(MTB.SYN_REPORT) and 225 event[MTB.EV_TYPE] == EV_ABS and 226 event[MTB.EV_CODE] == ABS_MT_PRESSURE) 227 228 @classmethod 229 def is_finger_data(cls, event): 230 return (cls.is_ABS_MT_POSITION_X(event) or 231 cls.is_ABS_MT_POSITION_Y(event) or 232 cls.is_ABS_MT_PRESSURE(event)) 233 234 @classmethod 235 def is_EV_KEY(cls, event): 236 """Is this an EV_KEY event?""" 237 return (not event.get(MTB.SYN_REPORT) and event[MTB.EV_TYPE] == EV_KEY) 238 239 @classmethod 240 def is_BTN_LEFT(cls, event): 241 """Is this event BTN_LEFT?""" 242 return (cls.is_EV_KEY(event) and event[MTB.EV_CODE] == BTN_LEFT) 243 244 @classmethod 245 def is_BTN_LEFT_value(cls, event, value): 246 """Is this event BTN_LEFT with value equal to the specified value?""" 247 return (cls.is_BTN_LEFT(event) and event[MTB.EV_VALUE] == value) 248 249 @classmethod 250 def is_BTN_TOOL_FINGER(cls, event): 251 """Is this event BTN_TOOL_FINGER?""" 252 return (cls.is_EV_KEY(event) and 253 event[MTB.EV_CODE] == BTN_TOOL_FINGER) 254 255 @classmethod 256 def is_BTN_TOOL_DOUBLETAP(cls, event): 257 """Is this event BTN_TOOL_DOUBLETAP?""" 258 return (cls.is_EV_KEY(event) and 259 event[MTB.EV_CODE] == BTN_TOOL_DOUBLETAP) 260 261 @classmethod 262 def is_BTN_TOOL_TRIPLETAP(cls, event): 263 """Is this event BTN_TOOL_TRIPLETAP?""" 264 return (cls.is_EV_KEY(event) and 265 event[MTB.EV_CODE] == BTN_TOOL_TRIPLETAP) 266 267 @classmethod 268 def is_BTN_TOOL_QUADTAP(cls, event): 269 """Is this event BTN_TOOL_QUADTAP?""" 270 return (cls.is_EV_KEY(event) and 271 event[MTB.EV_CODE] == BTN_TOOL_QUADTAP) 272 273 @classmethod 274 def is_BTN_TOOL_QUINTTAP(cls, event): 275 """Is this event BTN_TOOL_QUINTTAP?""" 276 return (cls.is_EV_KEY(event) and 277 event[MTB.EV_CODE] == BTN_TOOL_QUINTTAP) 278 279 @classmethod 280 def is_BTN_TOUCH(cls, event): 281 """Is this event BTN_TOUCH?""" 282 return (cls.is_EV_KEY(event) and 283 event[MTB.EV_CODE] == BTN_TOUCH) 284 285 @classmethod 286 def is_SYN_REPORT(self, event): 287 """Determine if this event is SYN_REPORT.""" 288 return event.get(MTB.SYN_REPORT, False) 289 290 291 class MtbEvemu: 292 """A simplified class provides MTB utilities for evemu event format.""" 293 def __init__(self, device): 294 self.mtb = Mtb(device=device) 295 self.num_tracking_ids = 0 296 297 def _convert_event(self, event): 298 (tv_sec, tv_usec, ev_type, ev_code, ev_value) = event 299 ev_time = float(tv_sec + tv_usec * 0.000001) 300 return MtbParser.make_ev_dict((ev_time, ev_type, ev_code, ev_value)) 301 302 def all_fingers_leaving(self): 303 """Is there no finger on the touch device?""" 304 return self.num_tracking_ids <= 0 305 306 def process_event(self, event): 307 """Process the event and count existing fingers.""" 308 converted_event = self._convert_event(event) 309 if MtbEvent.is_new_contact(converted_event): 310 self.num_tracking_ids += 1 311 elif MtbEvent.is_finger_leaving(converted_event): 312 self.num_tracking_ids -= 1 313 314 315 class MtbSanity: 316 """Sanity check for MTB format correctness. 317 318 The rules to check the sanity will accumulate over time. However, let's 319 begin with some simple checks: 320 - A finger should not leave before arriving. (Should not set -1 to the 321 tracking ID before assigning a positive value to the slot first.) 322 - Should not assign X, Y, or PRESSURE data to a slot without a positive 323 tracking ID. 324 """ 325 ERR_FINGER_LEAVE = 'FINGER_LEAVING_BEFORE_ARRIVING' 326 ERR_ASSIGN_ATTR = 'ASSIGN_ATTRIBUTES_BEFORE_ARRIVING' 327 328 def __init__(self, packets): 329 self.packets = packets 330 331 def check(self): 332 """Do some sanity checks. 333 334 Note that it takes care of the case that X and Y events may come earlier 335 than the TRACKING ID event. We consider it as valid. 336 337 Event: time, type 3 (EV_ABS), code 47 (ABS_MT_SLOT), value 1 338 Event: time, type 3 (EV_ABS), code 53 (ABS_MT_POSITION_X), value 2632 339 Event: time, type 3 (EV_ABS), code 54 (ABS_MT_POSITION_Y), value 288 340 Event: time, type 3 (EV_ABS), code 57 (ABS_MT_TRACKING_ID), value 1017 341 ... 342 343 In this case, X and Y events are stored in tmp_errors. When TRACKING ID 344 event shows up in the packet, it removes the errors associated with 345 this slot 1. 346 """ 347 errors = {self.ERR_FINGER_LEAVE: 0, self.ERR_ASSIGN_ATTR: 0} 348 349 def _errors_factory(): 350 return copy.deepcopy( 351 {self.ERR_FINGER_LEAVE: 0, self.ERR_ASSIGN_ATTR: 0}) 352 353 curr_slot_id = 0 354 slot_to_tid = {curr_slot_id: -1} 355 356 for packet in self.packets: 357 tmp_errors = defaultdict(_errors_factory) 358 for event in packet: 359 # slot event 360 if MtbEvent.is_ABS_MT_SLOT(event): 361 curr_slot_id = event[MTB.EV_VALUE] 362 if curr_slot_id not in slot_to_tid: 363 slot_to_tid.update({curr_slot_id: -1}) 364 365 # finger arriving 366 elif MtbEvent.is_new_contact(event): 367 slot_to_tid.update({curr_slot_id: event[MTB.EV_VALUE]}) 368 if curr_slot_id in tmp_errors: 369 del tmp_errors[curr_slot_id] 370 371 # finger leaving 372 elif MtbEvent.is_finger_leaving(event): 373 if slot_to_tid.get(curr_slot_id, -1) == -1: 374 tmp_errors[curr_slot_id][self.ERR_FINGER_LEAVE] += 1 375 else: 376 slot_to_tid[curr_slot_id] = -1 377 378 # access a slot attribute before finger arriving 379 elif MtbEvent.is_finger_data(event): 380 if slot_to_tid.get(curr_slot_id, -1) == -1: 381 tmp_errors[curr_slot_id][self.ERR_ASSIGN_ATTR] += 1 382 383 # At SYN_REPORT, accumulate how many errors occur. 384 elif MtbEvent.is_SYN_REPORT(event): 385 for _, slot_errors in tmp_errors.items(): 386 for err_string, err_count in slot_errors.items(): 387 errors[err_string] += err_count 388 389 return errors 390 391 392 class MtbStateMachine: 393 """The state machine for MTB events. 394 395 It traces the slots, tracking IDs, x coordinates, y coordinates, etc. If 396 these values are not changed explicitly, the values are kept across events. 397 398 Note that the kernel driver only reports what is changed. Due to its 399 internal state machine, it is possible that either x or y in 400 self.point[tid] is None initially even though the instance has been created. 401 """ 402 def __init__(self): 403 # Set the default slot to 0 as it may not be displayed in the MTB events 404 # 405 # Some abnormal event files may not display the tracking ID in the 406 # beginning. To handle this situation, we need to initialize 407 # the following variables: slot_to_tid, point 408 # 409 # As an example, refer to the following event file which is one of 410 # the golden samples with this problem. 411 # tests/data/stationary_finger_shift_with_2nd_finger_tap.dat 412 self.tid = None 413 self.slot = 0 414 self.slot_to_tid = {self.slot: self.tid} 415 self.point = {self.tid: Point()} 416 self.pressure = {self.tid: None} 417 self.syn_time = None 418 self.new_tid = False 419 self.number_fingers = 0 420 self.leaving_slots = [] 421 422 def _del_leaving_slots(self): 423 """Delete the leaving slots. Remove the slots and their tracking IDs.""" 424 for slot in self.leaving_slots: 425 del self.slot_to_tid[slot] 426 self.number_fingers -= 1 427 self.leaving_slots = [] 428 429 def _add_new_tracking_id(self, tid): 430 self.tid = tid 431 self.slot_to_tid[self.slot] = self.tid 432 self.new_tid = True 433 self.point[self.tid] = Point() 434 self.pressure[self.tid] = None 435 self.number_fingers += 1 436 437 def add_event(self, event): 438 """Update the internal states with the event. 439 440 @param event: an MTB event 441 """ 442 self.new_tid = False 443 444 # Switch the slot. 445 if MtbEvent.is_ABS_MT_SLOT(event): 446 self.slot = event[MTB.EV_VALUE] 447 448 # Get a new tracking ID. 449 elif MtbEvent.is_new_contact(event): 450 self._add_new_tracking_id(event[MTB.EV_VALUE]) 451 452 # A slot is leaving. 453 # Do not delete this slot until this last packet is reported. 454 elif MtbEvent.is_finger_leaving(event): 455 self.leaving_slots.append(self.slot) 456 457 else: 458 # Gracefully handle the case where we weren't given a tracking id 459 # by using a default value for these mystery fingers 460 if (not self.slot in self.slot_to_tid and 461 MtbEvent.is_finger_data(event)): 462 self._add_new_tracking_id(999999) 463 464 # Update x value. 465 if MtbEvent.is_ABS_MT_POSITION_X(event): 466 self.point[self.slot_to_tid[self.slot]].x = event[MTB.EV_VALUE] 467 468 # Update y value. 469 elif MtbEvent.is_ABS_MT_POSITION_Y(event): 470 self.point[self.slot_to_tid[self.slot]].y = event[MTB.EV_VALUE] 471 472 # Update z value (pressure) 473 elif MtbEvent.is_ABS_MT_PRESSURE(event): 474 self.pressure[self.slot_to_tid[self.slot]] = event[MTB.EV_VALUE] 475 476 # Use the SYN_REPORT time as the packet time 477 elif MtbEvent.is_SYN_REPORT(event): 478 self.syn_time = event[MTB.EV_TIME] 479 480 def get_snapshot(self): 481 """Get the snapshot of current data of all slots.""" 482 slots_data = [] 483 for slot, tid in self.slot_to_tid.items(): 484 slot_data = [self.syn_time, slot, tid, 485 self.point.get(tid), self.pressure.get(tid)] 486 slots_data.append(slot_data) 487 return slots_data 488 489 def get_current_tid_data_for_all_tids(self, request_data_ready=True): 490 """Get current packet's tid data including the point, the pressure, and 491 the syn_time for all tids. 492 493 @param request_data_ready: if set to true, it will not output 494 current_tid_data until all data including x, y, pressure, 495 syn_time, etc. in the packet have been assigned. 496 """ 497 current_tid_data = [] 498 for slot, tid in self.slot_to_tid.items(): 499 point = copy.deepcopy(self.point.get(tid)) 500 pressure = self.pressure.get(tid) 501 # Check if all attributes are non-None values. 502 # Note: we cannot use 503 # all([all(point.value()), pressure, self.syn_time]) 504 # E.g., for a point = (0, 300), it will return False 505 # which is not what we want. We want it to return False 506 # only when there are None values. 507 data_ready = all(map(lambda e: e is not None, 508 list(point.value()) + [pressure, self.syn_time])) 509 510 if (not request_data_ready) or data_ready: 511 tid_packet = TidPacket(self.syn_time, point, pressure) 512 else: 513 tid_packet = None 514 # Even tid_packet is None, we would like to report this tid so that 515 # its client function get_ordered_finger_paths() could construct 516 # an ordered dictionary correctly based on the tracking ID. 517 current_tid_data.append((tid, slot, tid_packet)) 518 519 self._del_leaving_slots() 520 521 return sorted(current_tid_data) 522 523 524 class Mtb: 525 """An MTB class providing MTB format related utility methods.""" 526 LEN_MOVING_AVERAGE = 2 527 LEVEL_JUMP_RATIO = 3 528 LEVEL_JUMP_MAXIUM_ALLOWED = 10 529 LEN_DISCARD = 5 530 531 def __init__(self, device=None, packets=None): 532 self.device = device 533 self.packets = packets 534 self._define_check_event_func_list() 535 536 def _define_check_event_func_list(self): 537 """Define event function lists for various event cycles below.""" 538 self.check_event_func_list = {} 539 self.MAX_FINGERS = 5 540 # One-finger touching the device should generate the following events: 541 # BTN_TOUCH, and BTN_TOOL_FINGER: 0 -> 1 -> 0 542 self.check_event_func_list[1] = [MtbEvent.is_BTN_TOUCH, 543 MtbEvent.is_BTN_TOOL_FINGER] 544 545 # Two-finger touching the device should generate the following events: 546 # BTN_TOUCH, and BTN_TOOL_DOUBLETAP: 0 -> 1 -> 0 547 self.check_event_func_list[2] = [MtbEvent.is_BTN_TOUCH, 548 MtbEvent.is_BTN_TOOL_DOUBLETAP] 549 550 # Three-finger touching the device should generate the following events: 551 # BTN_TOUCH, and BTN_TOOL_TRIPLETAP: 0 -> 1 -> 0 552 self.check_event_func_list[3] = [MtbEvent.is_BTN_TOUCH, 553 MtbEvent.is_BTN_TOOL_TRIPLETAP] 554 555 # Four-finger touching the device should generate the following events: 556 # BTN_TOUCH, and BTN_TOOL_QUADTAP: 0 -> 1 -> 0 557 self.check_event_func_list[4] = [MtbEvent.is_BTN_TOUCH, 558 MtbEvent.is_BTN_TOOL_QUADTAP] 559 560 # Five-finger touching the device should generate the following events: 561 # BTN_TOUCH, and BTN_TOOL_QUINTTAP: 0 -> 1 -> 0 562 self.check_event_func_list[5] = [MtbEvent.is_BTN_TOUCH, 563 MtbEvent.is_BTN_TOOL_QUINTTAP] 564 565 # Physical click should generate the following events: 566 # BTN_LEFT: 0 -> 1 -> 0 567 self.check_event_func_click = [MtbEvent.is_BTN_LEFT,] 568 569 570 571 def _calc_movement_for_axis(self, x, prev_x): 572 """Calculate the distance moved in an axis.""" 573 return abs(x - prev_x) if prev_x is not None else 0 574 575 def _calc_distance(self, (x0, y0), (x1, y1)): 576 """Calculate the distance in pixels between two points.""" 577 dist_x = x1 - x0 578 dist_y = y1 - y0 579 return math.sqrt(dist_x * dist_x + dist_y * dist_y) 580 581 def _init_dict(self, keys, value): 582 """Initialize a dictionary over the keys with the same given value. 583 584 Note: The following command does not always work: 585 dict.fromkeys(keys, value) 586 It works when value is a simple type, e.g., an integer. 587 However, if value is [] or {}, it does not work correctly. 588 The reason is that if the value is [] or {}, all the keys would 589 point to the same list or dictionary, which is not expected 590 in most cases. 591 """ 592 return dict([(key, copy.deepcopy(value)) for key in keys]) 593 594 def get_number_contacts(self): 595 """Get the number of contacts (Tracking IDs).""" 596 num_contacts = 0 597 for packet in self.packets: 598 for event in packet: 599 if MtbEvent.is_new_contact(event): 600 num_contacts += 1 601 return num_contacts 602 603 def get_x_y(self, target_slot): 604 """Extract x and y positions in the target slot.""" 605 # The default slot is slot 0 if no slot number is assigned. 606 # The rationale is that evdev is a state machine. It only reports 607 # the change. Slot 0 would not be reported by evdev if last time 608 # the last finger left the touch device was at slot 0. 609 slot = 0 610 611 # Should not write "list_x = list_y = []" below. 612 # They would end up with pointing to the same list. 613 list_x = [] 614 list_y = [] 615 prev_x = prev_y = None 616 target_slot_live = False 617 initial_default_slot_0 = True 618 for packet in self.packets: 619 if (slot == target_slot and slot == 0 and not target_slot_live and 620 initial_default_slot_0): 621 target_slot_live = True 622 initial_default_slot_0 = False 623 for event in packet: 624 if MtbEvent.is_ABS_MT_SLOT(event): 625 slot = event[MTB.EV_VALUE] 626 if slot == target_slot and not target_slot_live: 627 target_slot_live = True 628 if slot != target_slot: 629 continue 630 631 # Update x value if available. 632 if MtbEvent.is_ABS_MT_POSITION_X(event): 633 prev_x = event[MTB.EV_VALUE] 634 # Update y value if available. 635 elif MtbEvent.is_ABS_MT_POSITION_Y(event): 636 prev_y = event[MTB.EV_VALUE] 637 # Check if the finger at the target_slot is leaving. 638 elif MtbEvent.is_finger_leaving(event): 639 target_slot_live = False 640 641 # If target_slot is alive, and both x and y have 642 # been assigned values, append the x and y to the list no matter 643 # whether x or y position is reported in the current packet. 644 # This also handles the initial condition that no previous x or y 645 # is reported yet. 646 if target_slot_live and prev_x and prev_y: 647 list_x.append(prev_x) 648 list_y.append(prev_y) 649 return (list_x, list_y) 650 651 def get_ordered_finger_paths(self, request_data_ready=True): 652 """Construct the finger paths ordered by the occurrences of the 653 finger contacts. 654 655 @param request_data_ready: if set to true, it will not output the 656 tid_data in a packet until all data including x, y, pressure, 657 syn_time, etc. in the packet have been assigned. 658 659 The finger_paths mapping the tid to its finger_path looks like 660 {tid1: finger_path1, 661 tid2: finger_path2, 662 ... 663 } 664 where every tid represents a finger. 665 666 A finger_path effectively consists of a list of tid_packets of the same 667 tid in the event file. An example of its structure looks like 668 finger_path: 669 slot=0 670 tid_packets = [tid_packet0, tid_packet1, tid_packet2, ...] 671 672 A tid_packet looks like 673 [100021.342104, # syn_time 674 (66, 100), # point 675 56, # pressure 676 ... # maybe more attributes added later. 677 ] 678 679 This method is applicable when fingers are contacting and leaving 680 the touch device continuously. The same slot number, e.g., slot 0 or 681 slot 1, may be used for multiple times. 682 683 Note that the finger contact starts at 0. The finger contacts look to 684 be equal to the slot numbers in practice. However, this assumption 685 seems not enforced in any document. For safety, we use the ordered 686 finger paths dict here to guarantee that we could access the ith finger 687 contact path data correctly. 688 689 Also note that we do not sort finger paths by tracking IDs to derive 690 the ordered dict because tracking IDs may wrap around. 691 """ 692 # ordered_finger_paths_dict is an ordered dictionary of 693 # {tid: FingerPath} 694 ordered_finger_paths_dict = OrderedDict() 695 sm = MtbStateMachine() 696 for packet in self.packets: 697 # Inject events into the state machine to update its state. 698 for event in packet: 699 sm.add_event(event) 700 701 # If there are N fingers (tids) in a packet, we will have 702 # N tid_packet's in the current packet. The loop below is to 703 # append every tid_packet into its corresponding finger_path for 704 # every tracking id in the current packet. 705 for tid, slot, tid_packet in sm.get_current_tid_data_for_all_tids( 706 request_data_ready): 707 finger_path = ordered_finger_paths_dict.setdefault( 708 tid, FingerPath(slot, [])) 709 if tid_packet: 710 finger_path.tid_packets.append(tid_packet) 711 712 return ordered_finger_paths_dict 713 714 def get_ordered_finger_path(self, finger, attr): 715 """Extract the specified attribute from the packets of the ith finger 716 contact. 717 718 @param finger: the ith finger contact 719 @param attr: an attribute in a tid packet which could be either 720 'point', 'pressure', or 'syn_time' 721 """ 722 # finger_paths is a list ordered by the occurrences of finger contacts 723 finger_paths = self.get_ordered_finger_paths().values() 724 if finger < len(finger_paths) and finger >= 0: 725 finger_path = finger_paths[finger] 726 return finger_path.get(attr) 727 return [] 728 729 def get_slot_data(self, slot, attr): 730 """Extract the attribute data of the specified slot. 731 732 @param attr: an attribute in a tid packet which could be either 733 'point', 'pressure', or 'syn_time' 734 """ 735 for finger_path in self.get_ordered_finger_paths().values(): 736 if finger_path.slot == slot: 737 return finger_path.get(attr) 738 return [] 739 740 def get_max_distance_of_all_tracking_ids(self): 741 """Get the max moving distance of all tracking IDs.""" 742 return max(max(get_two_min_radii(finger_path.get('point')) 743 for finger_path in self.get_ordered_finger_paths().values())) 744 745 def get_list_of_rocs_of_all_tracking_ids(self): 746 """For each tracking ID, compute the minimal enclosing circles. 747 748 Return a list of radii of such minimal enclosing circles of 749 all tracking IDs. 750 Note: rocs denotes the radii of circles 751 """ 752 list_rocs = [] 753 for finger_path in self.get_ordered_finger_paths().values(): 754 # Convert the point coordinates in pixels to in mms. 755 points_in_mm = [Point(*self.device.pixel_to_mm(p.value())) 756 for p in finger_path.get('point')] 757 list_rocs += get_two_min_radii(points_in_mm) 758 return list_rocs 759 760 def get_x_y_multiple_slots(self, target_slots): 761 """Extract points in multiple slots. 762 763 Only the packets with all specified slots are extracted. 764 This is useful to collect packets for pinch to zoom. 765 """ 766 # Initialize slot_exists dictionary to False 767 slot_exists = dict.fromkeys(target_slots, False) 768 769 # Set the initial slot number to 0 because evdev is a state machine, 770 # and may not present slot 0. 771 slot = 0 772 # Initialize the following dict to [] 773 # Don't use "dict.fromkeys(target_slots, [])" 774 list_x = self._init_dict(target_slots, []) 775 list_y = self._init_dict(target_slots, []) 776 x = self._init_dict(target_slots, None) 777 y = self._init_dict(target_slots, None) 778 for packet in self.packets: 779 for event in packet: 780 if MtbEvent.is_ABS_MT_SLOT(event): 781 slot = event[MTB.EV_VALUE] 782 if slot not in target_slots: 783 continue 784 785 if MtbEvent.is_ABS_MT_TRACKING_ID(event): 786 if MtbEvent.is_new_contact(event): 787 slot_exists[slot] = True 788 elif MtbEvent.is_finger_leaving(event): 789 slot_exists[slot] = False 790 elif MtbEvent.is_ABS_MT_POSITION_X(event): 791 x[slot] = event[MTB.EV_VALUE] 792 elif MtbEvent.is_ABS_MT_POSITION_Y(event): 793 y[slot] = event[MTB.EV_VALUE] 794 795 # Note: 796 # - All slot_exists must be True to append x, y positions for the 797 # slots. 798 # - All x and y values for all slots must have been reported once. 799 # (This handles the initial condition that no previous x or y 800 # is reported yet.) 801 # - If either x or y positions are reported in the current packet, 802 # append x and y to the list of that slot. 803 # (This handles the condition that only x or y is reported.) 804 # - Even in the case that neither x nor y is reported in current 805 # packet, cmt driver constructs and passes hwstate to gestures. 806 if (all(slot_exists.values()) and all(x.values()) and 807 all(y.values())): 808 for s in target_slots: 809 list_x[s].append(x[s]) 810 list_y[s].append(y[s]) 811 812 return (list_x, list_y) 813 814 def get_points_multiple_slots(self, target_slots): 815 """Get the points in multiple slots.""" 816 list_x, list_y = self.get_x_y_multiple_slots(target_slots) 817 points_list = [zip(list_x[slot], list_y[slot]) for slot in target_slots] 818 points_dict = dict(zip(target_slots, points_list)) 819 return points_dict 820 821 def get_relative_motion(self, target_slots): 822 """Get the relative motion of the two target slots.""" 823 # The slots in target_slots could be (0, 1), (1, 2) or other 824 # possibilities. 825 slot_a, slot_b = target_slots 826 points_dict = self.get_points_multiple_slots(target_slots) 827 points_slot_a = points_dict[slot_a] 828 points_slot_b = points_dict[slot_b] 829 830 # if only 0 or 1 point observed, the relative motion is 0. 831 if len(points_slot_a) <= 1 or len(points_slot_b) <= 1: 832 return 0 833 834 distance_begin = self._calc_distance(points_slot_a[0], points_slot_b[0]) 835 distance_end = self._calc_distance(points_slot_a[-1], points_slot_b[-1]) 836 relative_motion = distance_end - distance_begin 837 return relative_motion 838 839 def get_points(self, target_slot): 840 """Get the points in the target slot.""" 841 list_x, list_y = self.get_x_y(target_slot) 842 return zip(list_x, list_y) 843 844 def get_distances(self, target_slot): 845 """Get the distances of neighbor points in the target slot.""" 846 points = self.get_points(target_slot) 847 distances = [] 848 for index in range(len(points) - 1): 849 distance = self._calc_distance(points[index], points[index + 1]) 850 distances.append(distance) 851 return distances 852 853 def get_range(self): 854 """Get the min and max values of (x, y) positions.""" 855 min_x = min_y = float('infinity') 856 max_x = max_y = float('-infinity') 857 for packet in self.packets: 858 for event in packet: 859 if MtbEvent.is_ABS_MT_POSITION_X(event): 860 x = event[MTB.EV_VALUE] 861 min_x = min(min_x, x) 862 max_x = max(max_x, x) 863 elif MtbEvent.is_ABS_MT_POSITION_Y(event): 864 y = event[MTB.EV_VALUE] 865 min_y = min(min_y, y) 866 max_y = max(max_y, y) 867 return (min_x, max_x, min_y, max_y) 868 869 def get_total_motion(self, target_slot): 870 """Get the total motion in the target slot.""" 871 prev_x = prev_y = None 872 accu_x = accu_y = 0 873 slot = None 874 for packet in self.packets: 875 for event in packet: 876 if MtbEvent.is_ABS_MT_SLOT(event): 877 slot = event[MTB.EV_VALUE] 878 elif (MtbEvent.is_ABS_MT_POSITION_X(event) and 879 slot == target_slot): 880 x = event[MTB.EV_VALUE] 881 accu_x += self._calc_movement_for_axis(x, prev_x) 882 prev_x = x 883 elif (MtbEvent.is_ABS_MT_POSITION_Y(event) and 884 slot == target_slot): 885 y = event[MTB.EV_VALUE] 886 accu_y += self._calc_movement_for_axis(y, prev_y) 887 prev_y = y 888 return (accu_x, accu_y) 889 890 def get_max_distance(self, slot, unit): 891 """Get the max distance between any two points of the specified slot.""" 892 points = self.get_slot_data(slot, 'point') 893 return self.get_max_distance_from_points(points, unit) 894 895 def get_max_distance_from_points(self, points, unit): 896 """Get the max distance between any two points.""" 897 two_farthest_points = get_two_farthest_points(points) 898 if len(two_farthest_points) < 2: 899 return 0 900 901 # Convert the point coordinates from pixel to mm if necessary. 902 if unit == UNIT.MM: 903 p1, p2 = [Point(*self.device.pixel_to_mm(p.value())) 904 for p in two_farthest_points] 905 else: 906 p1, p2 = two_farthest_points 907 908 return p1.distance(p2) 909 910 def get_largest_gap_ratio(self, target_slot): 911 """Get the largest gap ratio in the target slot. 912 913 gap_ratio_with_prev = curr_gap / prev_gap 914 gap_ratio_with_next = curr_gap / next_gap 915 916 This function tries to find the largest gap_ratio_with_prev 917 with the restriction that gap_ratio_with_next is larger than 918 RATIO_THRESHOLD_CURR_GAP_TO_NEXT_GAP. 919 920 The ratio threshold is used to prevent the gaps detected in a swipe. 921 Note that in a swipe, the gaps tends to become larger and larger. 922 """ 923 RATIO_THRESHOLD_CURR_GAP_TO_NEXT_GAP = 1.2 924 GAP_LOWER_BOUND = 10 925 926 gaps = self.get_distances(target_slot) 927 gap_ratios = [] 928 largest_gap_ratio = float('-infinity') 929 for index in range(1, len(gaps) - 1): 930 prev_gap = max(gaps[index - 1], 1) 931 curr_gap = gaps[index] 932 next_gap = max(gaps[index + 1], 1) 933 gap_ratio_with_prev = curr_gap / prev_gap 934 gap_ratio_with_next = curr_gap / next_gap 935 if (curr_gap >= GAP_LOWER_BOUND and 936 gap_ratio_with_prev > largest_gap_ratio and 937 gap_ratio_with_next > RATIO_THRESHOLD_CURR_GAP_TO_NEXT_GAP): 938 largest_gap_ratio = gap_ratio_with_prev 939 940 return largest_gap_ratio 941 942 def _is_large(self, numbers, index): 943 """Is the number at the index a large number compared to the moving 944 average of the previous LEN_MOVING_AVERAGE numbers? This is used to 945 determine if a distance is a level jump.""" 946 if index < self.LEN_MOVING_AVERAGE + 1: 947 return False 948 moving_sum = sum(numbers[index - self.LEN_MOVING_AVERAGE : index]) 949 moving_average = float(moving_sum) / self.LEN_MOVING_AVERAGE 950 cond1 = numbers[index] >= self.LEVEL_JUMP_RATIO * moving_average 951 cond2 = numbers[index] >= self.LEVEL_JUMP_MAXIUM_ALLOWED 952 return cond1 and cond2 953 954 def _accumulate_level_jumps(self, level_jumps): 955 """Accumulate level jumps.""" 956 if not level_jumps: 957 return [] 958 959 is_positive = level_jumps[0] > 0 960 tmp_sum = 0 961 accumulated_level_jumps = [] 962 for jump in level_jumps: 963 # If current sign is the same as previous ones, accumulate it. 964 if (jump > 0) is is_positive: 965 tmp_sum += jump 966 # If current jump has changed its sign, reset the tmp_sum to 967 # accumulate the level_jumps onwards. 968 else: 969 accumulated_level_jumps.append(tmp_sum) 970 tmp_sum = jump 971 is_positive = not is_positive 972 973 if tmp_sum != 0: 974 accumulated_level_jumps.append(tmp_sum) 975 return accumulated_level_jumps 976 977 def get_largest_accumulated_level_jumps(self, displacements): 978 """Get the largest accumulated level jumps in displacements.""" 979 largest_accumulated_level_jumps = 0 980 if len(displacements) < self.LEN_MOVING_AVERAGE + 1: 981 return largest_accumulated_level_jumps 982 983 # Truncate some elements at both ends of the list which are not stable. 984 displacements = displacements[self.LEN_DISCARD: -self.LEN_DISCARD] 985 distances = map(abs, displacements) 986 987 # E.g., displacements= [5, 6, 5, 6, 20, 3, 5, 4, 6, 21, 4, ...] 988 # level_jumps = [20, 21, ...] 989 level_jumps = [disp for i, disp in enumerate(displacements) 990 if self._is_large(distances, i)] 991 992 # E.g., level_jumps= [20, 21, -18, -25, 22, 18, -19] 993 # accumulated_level_jumps = [41, -43, 40, -19] 994 # largest_accumulated_level_jumps = 43 995 accumulated_level_jumps = self._accumulate_level_jumps(level_jumps) 996 if accumulated_level_jumps: 997 abs_accumulated_level_jumps = map(abs, accumulated_level_jumps) 998 largest_accumulated_level_jumps = max(abs_accumulated_level_jumps) 999 1000 return largest_accumulated_level_jumps 1001 1002 def get_displacement(self, target_slot): 1003 """Get the displacement in the target slot.""" 1004 displace = [map(lambda p0, p1: p1 - p0, axis[:len(axis) - 1], axis[1:]) 1005 for axis in self.get_x_y(target_slot)] 1006 displacement_dict = dict(zip((AXIS.X, AXIS.Y), displace)) 1007 return displacement_dict 1008 1009 def calc_displacement(self, numbers): 1010 """Calculate the displacements in a list of numbers.""" 1011 if len(numbers) <= 1: 1012 return [] 1013 return [numbers[i + 1] - numbers[i] for i in range(len(numbers) - 1)] 1014 1015 def get_displacements_for_slots(self, min_slot): 1016 """Get the displacements for slots >= min_slot.""" 1017 finger_paths = self.get_ordered_finger_paths() 1018 1019 # Remove those tracking IDs with slots < min_slot 1020 for tid, finger_path in finger_paths.items(): 1021 if finger_path.slot < min_slot: 1022 del finger_paths[tid] 1023 1024 # Calculate the displacements of the coordinates in the tracking IDs. 1025 displacements = defaultdict(dict) 1026 for tid, finger_path in finger_paths.items(): 1027 finger_path_values = [p.value() for p in finger_path.get('point')] 1028 if finger_path_values: 1029 list_x, list_y = zip(*finger_path_values) 1030 else: 1031 list_x, list_y = [], [] 1032 displacements[tid][MTB.SLOT] = finger_path.slot 1033 displacements[tid][AXIS.X] = self.calc_displacement(list_x) 1034 displacements[tid][AXIS.Y] = self.calc_displacement(list_y) 1035 1036 return displacements 1037 1038 def _get_segments_by_length(self, src_list, segment_flag, ratio): 1039 """Get the segments based on segment_flag and the ratio of the 1040 src_list length (size). 1041 1042 @param src_list: could be list_x, list_y, or list_t 1043 @param segment_flag: indicating which segment (the begin, the end, or 1044 the middle segment) to extract 1045 @param ratio: the ratio of the time interval of the segment 1046 """ 1047 end_size = int(round(len(src_list) * ratio)) 1048 if segment_flag == VAL.WHOLE: 1049 return src_list 1050 elif segment_flag == VAL.MIDDLE: 1051 return src_list[end_size: -end_size] 1052 elif segment_flag == VAL.BEGIN: 1053 return src_list[: end_size] 1054 elif segment_flag == VAL.END: 1055 return src_list[-end_size:] 1056 elif segment_flag == VAL.BOTH_ENDS: 1057 bgn_segment = src_list[: end_size] 1058 end_segment = src_list[-end_size:] 1059 return bgn_segment + end_segment 1060 else: 1061 return None 1062 1063 def get_segments_x_and_y(self, ax, ay, segment_flag, ratio): 1064 """Get the segments for both x and y axes.""" 1065 segment_x = self._get_segments_by_length(ax, segment_flag, ratio) 1066 segment_y = self._get_segments_by_length(ay, segment_flag, ratio) 1067 return (segment_x, segment_y) 1068 1069 def get_segments_by_distance(self, list_t, list_coord, segment_flag, ratio): 1070 """Partition list_coord into the begin, the middle, and the end 1071 segments based on segment_flag and the ratio. And then use the 1072 derived indexes to partition list_t. 1073 1074 @param list_t: the list of syn_report time instants 1075 @param list_coord: could be list_x, list_y 1076 @param segment_flag: indicating which segment (the being, the end, or 1077 the middle segment) to extract 1078 @param ratio: the ratio of the distance of the begin/end segment 1079 with the value between 0.0 and 1.0 1080 """ 1081 def _find_boundary_index(list_coord, boundary_distance): 1082 """Find the boundary index i such that 1083 abs(list_coord[i] - list_coord[0]) > boundary_distance 1084 1085 @param list_coord: a list of coordinates 1086 @param boundary_distance: the min distance between boundary_coord 1087 and list_coord[0] 1088 """ 1089 for i, c in enumerate(list_coord): 1090 if abs(c - list_coord[0]) > boundary_distance: 1091 return i 1092 1093 end_to_end_distance = abs(list_coord[-1] - list_coord[0]) 1094 first_idx_mid_seg = _find_boundary_index( 1095 list_coord, end_to_end_distance * ratio) 1096 last_idx_mid_seg = _find_boundary_index( 1097 list_coord, end_to_end_distance * (1 - ratio)) 1098 1099 if segment_flag == VAL.WHOLE: 1100 segment_coord = list_coord 1101 segment_t = list_t 1102 elif segment_flag == VAL.MIDDLE: 1103 segment_coord = list_coord[first_idx_mid_seg:last_idx_mid_seg] 1104 segment_t = list_t[first_idx_mid_seg:last_idx_mid_seg] 1105 elif segment_flag == VAL.BEGIN: 1106 segment_coord = list_coord[:first_idx_mid_seg] 1107 segment_t = list_t[:first_idx_mid_seg] 1108 elif segment_flag == VAL.END: 1109 segment_coord = list_coord[last_idx_mid_seg:] 1110 segment_t = list_t[last_idx_mid_seg:] 1111 else: 1112 segment_coord = [] 1113 segment_t = [] 1114 1115 return (segment_t, segment_coord) 1116 1117 def get_segments(self, list_t, list_coord, segment_flag, ratio): 1118 """Partition list_t and list_coord into the segments specified by 1119 the segment_flag and the ratio. 1120 1121 If the list_coord stretches long enough, it represents a normal 1122 line. We will partition list_t and list_coord by distance. 1123 1124 On the other hand, if the min and max values in list_coord are 1125 close to each other, it probably does not represent a line. We will 1126 partition list_t and list_coord by time in this case. 1127 E.g., in the follow cases, it is better to partition using length 1128 instead of using distance. 1129 list_coord = [105, 105, 105, 105, 105, 105, 105, 105, 105, 105] or 1130 list_coord = [104, 105, 105, 105, 105, 105, 105, 105, 105, 105] or 1131 list_coord = [105, 105, 105, 105, 105, 105, 105, 105, 105, 106] 1132 1133 @param list_t: the list of syn_report time instants 1134 @param list_coord: could be list_x, list_y 1135 @param segment_flag: indicating which segment (the being, the end, or 1136 the middle segment) to extract 1137 @param ratio: the ratio of the distance of the begin/end segment 1138 with the value between 0.0 and 1.0 1139 """ 1140 MIN_STRAIGHT_LINE_DIST = 20 1141 if (max(list_coord) - min(list_coord)) > MIN_STRAIGHT_LINE_DIST: 1142 return self.get_segments_by_distance(list_t, list_coord, 1143 segment_flag, ratio) 1144 else: 1145 return (self._get_segments_by_length(list_t, segment_flag, ratio), 1146 self._get_segments_by_length(list_coord, segment_flag, 1147 ratio)) 1148 1149 def get_reversed_motions(self, target_slot, direction, 1150 segment_flag=VAL.WHOLE, ratio=None): 1151 """Get the total reversed motions in the specified direction 1152 in the target slot. 1153 1154 Only the reversed motions specified by the segment_flag are taken. 1155 The segment_flag could be 1156 VAL.BEGIN: the begin segment 1157 VAL.MIDDLE : the middle segment 1158 VAL.END : the end segment 1159 VAL.BOTH_ENDS : the segments at both ends 1160 VAL.WHOLE: the whole line 1161 1162 The ratio represents the ratio of the BEGIN or END segment to the whole 1163 segment. 1164 1165 If direction is in HORIZONTAL_DIRECTIONS, consider only x axis. 1166 If direction is in VERTICAL_DIRECTIONS, consider only y axis. 1167 If direction is in DIAGONAL_DIRECTIONS, consider both x and y axes. 1168 1169 Assume that the displacements in GV.LR (moving from left to right) 1170 in the X axis are: 1171 1172 [10, 12, 8, -9, -2, 6, 8, 11, 12, 5, 2] 1173 1174 Its total reversed motion = (-9) + (-2) = -11 1175 """ 1176 # Define the axis moving directions dictionary 1177 POSITIVE = 'positive' 1178 NEGATIVE = 'negative' 1179 AXIS_MOVING_DIRECTIONS = { 1180 GV.LR: {AXIS.X: POSITIVE}, 1181 GV.RL: {AXIS.X: NEGATIVE}, 1182 GV.TB: {AXIS.Y: POSITIVE}, 1183 GV.BT: {AXIS.Y: NEGATIVE}, 1184 GV.CR: {AXIS.X: POSITIVE}, 1185 GV.CL: {AXIS.X: NEGATIVE}, 1186 GV.CB: {AXIS.Y: POSITIVE}, 1187 GV.CT: {AXIS.Y: NEGATIVE}, 1188 GV.BLTR: {AXIS.X: POSITIVE, AXIS.Y: NEGATIVE}, 1189 GV.BRTL: {AXIS.X: NEGATIVE, AXIS.Y: NEGATIVE}, 1190 GV.TRBL: {AXIS.X: NEGATIVE, AXIS.Y: POSITIVE}, 1191 GV.TLBR: {AXIS.X: POSITIVE, AXIS.Y: POSITIVE}, 1192 } 1193 1194 axis_moving_directions = AXIS_MOVING_DIRECTIONS.get(direction) 1195 func_positive = lambda n: n > 0 1196 func_negative = lambda n: n < 0 1197 reversed_functions = {POSITIVE: func_negative, NEGATIVE: func_positive} 1198 displacement_dict = self.get_displacement(target_slot) 1199 reversed_motions = {} 1200 for axis in AXIS.LIST: 1201 axis_moving_direction = axis_moving_directions.get(axis) 1202 if axis_moving_direction is None: 1203 continue 1204 displacement = displacement_dict[axis] 1205 displacement_segment = self._get_segments_by_length( 1206 displacement, segment_flag, ratio) 1207 reversed_func = reversed_functions[axis_moving_direction] 1208 reversed_motions[axis] = sum(filter(reversed_func, 1209 displacement_segment)) 1210 return reversed_motions 1211 1212 def get_num_packets(self, target_slot): 1213 """Get the number of packets in the target slot.""" 1214 list_x, list_y = self.get_x_y(target_slot) 1215 return len(list_x) 1216 1217 def get_list_syn_time(self, finger): 1218 """Get the list of syn_time instants from the packets of the ith finger 1219 contact if finger is not None. Otherwise, use all packets. 1220 1221 @param finger : the specified ith finger contact. 1222 If a finger contact is specified, extract only the list of 1223 syn_time from this finger contact. 1224 Otherwise, when the finger contact is set to None, take all 1225 packets into account. Note that the finger contact number 1226 starts from 0. 1227 1228 Note: the last event in a packet, represented as packet[-1], is 1229 'SYN_REPORT' of which the event time is the 'syn_time'. 1230 """ 1231 return (self.get_ordered_finger_path(finger, 'syn_time') 1232 if isinstance(finger, int) else 1233 [packet[-1].get(MTB.EV_TIME) for packet in self.packets]) 1234 1235 def _call_check_event_func(self, event, expected_value, check_event_result, 1236 check_event_func): 1237 """Call all functions in check_event_func and return the results. 1238 1239 Note that since check_event_result is a dictionary, it is passed 1240 by reference. 1241 """ 1242 for func in check_event_func: 1243 if func(event): 1244 event_value = event[MTB.EV_VALUE] 1245 check_event_result[func] = (event_value == expected_value) 1246 break 1247 1248 def _get_event_cycles(self, check_event_func): 1249 """A generic method to get the number of event cycles. 1250 1251 For a tap, its event cycle looks like: 1252 (1) finger touching the touch device: 1253 BTN_TOOL_FINGER: 0-> 1 1254 BTN_TOUCH: 0 -> 1 1255 (2) finger leaving the touch device: 1256 BTN_TOOL_FINGER: 1-> 0 1257 BTN_TOUCH: 1 -> 0 1258 1259 For a one-finger physical click, its event cycle looks like: 1260 (1) finger clicking and pressing: 1261 BTN_LEFT : 0-> 1 1262 BTN_TOOL_FINGER: 0-> 1 1263 BTN_TOUCH: 0 -> 1 1264 (2) finger leaving: 1265 BTN_LEFT : 1-> 0 1266 BTN_TOOL_FINGER: 1-> 0 1267 BTN_TOUCH: 1 -> 0 1268 1269 This method counts how many such cycles there are in the packets. 1270 """ 1271 # Initialize all check_event_result to False 1272 # when all_events_observed is False and all check_event_result are True 1273 # => all_events_observed is set to True 1274 # when all_events_observed is True and all check_event_result are True 1275 # => all_events_observed is set to False, and 1276 # count is increased by 1 1277 check_event_result = self._init_dict(check_event_func, False) 1278 all_events_observed = False 1279 count = 0 1280 for packet in self.packets: 1281 for event in packet: 1282 if all_events_observed: 1283 expected_value = 0 1284 self._call_check_event_func(event, expected_value, 1285 check_event_result, 1286 check_event_func) 1287 if all(check_event_result.values()): 1288 all_events_observed = False 1289 check_event_result = self._init_dict(check_event_func, 1290 False) 1291 count += 1 1292 else: 1293 expected_value = 1 1294 self._call_check_event_func(event, expected_value, 1295 check_event_result, 1296 check_event_func) 1297 if all(check_event_result.values()): 1298 all_events_observed = True 1299 check_event_result = self._init_dict(check_event_func, 1300 False) 1301 return count 1302 1303 def _get_event_cycles_for_num_fingers(self, num_fingers): 1304 return self._get_event_cycles(self.check_event_func_list[num_fingers]) 1305 1306 def verify_exact_number_fingers_touch(self, num_fingers): 1307 """Verify the exact number of fingers touching the device. 1308 1309 Example: for a two-finger touch 1310 2-finger touch cycles should be equal to 1 1311 3/4/5-finger touch cycles should be equal to 0 1312 Don't care about 1-finger touch cycles which is not deterministic. 1313 """ 1314 range_fingers = range(1, self.MAX_FINGERS) 1315 flag_check = self._init_dict(range_fingers, True) 1316 for f in range_fingers: 1317 cycles = self._get_event_cycles_for_num_fingers(f) 1318 if f == num_fingers: 1319 flag_check[f] = cycles == 1 1320 elif f > num_fingers: 1321 flag_check[f] = cycles == 0 1322 return all(flag_check) 1323 1324 def get_physical_clicks(self, num_fingers): 1325 """Get the count of physical clicks for the given number of fingers.""" 1326 flag_fingers_touch = self.verify_exact_number_fingers_touch(num_fingers) 1327 click_cycles = self._get_event_cycles(self.check_event_func_click) 1328 return click_cycles if flag_fingers_touch else 0 1329 1330 def get_raw_physical_clicks(self): 1331 """Get how many BTN_LEFT click events have been seen. 1332 1333 When calculating the raw BTN_LEFT click count, this method does not 1334 consider whether BTN_LEFT comes with the correct finger (tracking) IDs. 1335 A correct BTN_LEFT click consists of value 1 followed by value 0. 1336 """ 1337 click_count = 0 1338 btn_left_was_pressed = False 1339 for packet in self.packets: 1340 for event in packet: 1341 # when seeing BTN_LEFT value: 0 -> 1 1342 if (MtbEvent.is_BTN_LEFT_value(event, 1) and 1343 not btn_left_was_pressed): 1344 btn_left_was_pressed = True 1345 # when seeing BTN_LEFT value: 1 -> 0 1346 elif (MtbEvent.is_BTN_LEFT_value(event, 0) and 1347 btn_left_was_pressed): 1348 btn_left_was_pressed = False 1349 click_count += 1 1350 return click_count 1351 1352 def get_correct_physical_clicks(self, number_fingers): 1353 """Get the count of physical clicks correctly overlap with 1354 the given number of fingers. 1355 1356 @param num_fingers: the expected number of fingers when a physical 1357 click is seen 1358 """ 1359 sm = MtbStateMachine() 1360 correct_click_count = 0 1361 click_with_correct_number_fingers = False 1362 for packet in self.packets: 1363 btn_left_was_pressed = False 1364 btn_left_was_released = False 1365 for event in packet: 1366 sm.add_event(event) 1367 if MtbEvent.is_BTN_LEFT_value(event, 1): 1368 btn_left_was_pressed = True 1369 elif MtbEvent.is_BTN_LEFT_value(event, 0): 1370 btn_left_was_released = True 1371 sm.get_current_tid_data_for_all_tids() 1372 1373 # Check the number of fingers only after all events in this packet 1374 # have been processed. 1375 if btn_left_was_pressed or btn_left_was_released: 1376 click_with_correct_number_fingers |= (sm.number_fingers == 1377 number_fingers) 1378 1379 # If BTN_LEFT was released, reset the flag and increase the count. 1380 if btn_left_was_released and click_with_correct_number_fingers: 1381 correct_click_count += 1 1382 click_with_correct_number_fingers = False 1383 1384 return correct_click_count 1385 1386 1387 class MtbParser: 1388 """Touch device MTB event Parser.""" 1389 1390 def __init__(self): 1391 self._get_event_re_patt() 1392 1393 def _get_event_re_patt(self): 1394 """Construct the regular expression search pattern of MTB events. 1395 1396 An ordinary event looks like 1397 Event: time 133082.748019, type 3 (EV_ABS), code 0 (ABS_X), value 316 1398 A SYN_REPORT event looks like 1399 Event: time 10788.289613, -------------- SYN_REPORT ------------ 1400 """ 1401 # Get the pattern of an ordinary event 1402 event_patt_time = 'Event:\s*time\s*(\d+\.\d+)' 1403 event_patt_type = 'type\s*(\d+)\s*\(\w+\)' 1404 event_patt_code = 'code\s*(\d+)\s*\(\w+\)' 1405 event_patt_value = 'value\s*(-?\d+)' 1406 event_sep = ',\s*' 1407 event_patt = event_sep.join([event_patt_time, 1408 event_patt_type, 1409 event_patt_code, 1410 event_patt_value]) 1411 self.event_re_patt = re.compile(event_patt, re.I) 1412 1413 # Get the pattern of the SYN_REPORT event 1414 event_patt_type_SYN_REPORT = '-+\s*SYN_REPORT\s-+' 1415 event_patt_SYN_REPORT = event_sep.join([event_patt_time, 1416 event_patt_type_SYN_REPORT]) 1417 self.event_re_patt_SYN_REPORT = re.compile(event_patt_SYN_REPORT, re.I) 1418 1419 def _get_event_dict_ordinary(self, line): 1420 """Construct the event dictionary for an ordinary event.""" 1421 result = self.event_re_patt.search(line) 1422 if result is not None: 1423 return MtbParser.make_ev_dict((float(result.group(1)), 1424 int(result.group(2)), 1425 int(result.group(3)), 1426 int(result.group(4)))) 1427 return {} 1428 1429 @staticmethod 1430 def make_ev_dict(event): 1431 (ev_time, ev_type, ev_code, ev_value) = event 1432 ev_dict = {MTB.EV_TIME: ev_time, 1433 MTB.EV_TYPE: ev_type, 1434 MTB.EV_CODE: ev_code, 1435 MTB.EV_VALUE: ev_value} 1436 return ev_dict 1437 1438 @staticmethod 1439 def make_syn_report_ev_dict(syn_time): 1440 """Make the event dictionary for a SYN_REPORT event.""" 1441 ev_dict = {} 1442 ev_dict[MTB.EV_TIME] = float(syn_time) 1443 ev_dict[MTB.SYN_REPORT] = True 1444 return ev_dict 1445 1446 def _get_event_dict_SYN_REPORT(self, line): 1447 """Construct the event dictionary for a SYN_REPORT event.""" 1448 result = self.event_re_patt_SYN_REPORT.search(line) 1449 return ({} if result is None else 1450 MtbParser.make_syn_report_ev_dict(result.group(1))) 1451 1452 def _get_event_dict(self, line): 1453 """Construct the event dictionary.""" 1454 EVENT_FUNC_LIST = [self._get_event_dict_ordinary, 1455 self._get_event_dict_SYN_REPORT] 1456 for get_event_func in EVENT_FUNC_LIST: 1457 ev_dict = get_event_func(line) 1458 if ev_dict: 1459 return ev_dict 1460 return False 1461 1462 def _is_SYN_REPORT(self, ev_dict): 1463 """Determine if this event is SYN_REPORT.""" 1464 return ev_dict.get(MTB.SYN_REPORT, False) 1465 1466 def parse(self, raw_event): 1467 """Parse the raw event string into a list of event dictionary.""" 1468 ev_list = [] 1469 packets = [] 1470 start_flag = False 1471 finger_off = False 1472 for line in raw_event: 1473 ev_dict = self._get_event_dict(line) 1474 if ev_dict: 1475 start_flag = True 1476 1477 # Handle the case when a finger-off event is followed 1478 # immediately by a finger-on event in the same packet. 1479 if MtbEvent.is_finger_leaving(ev_dict): 1480 finger_off = True 1481 # Append a SYN_REPORT event and flush the ev_list to packets 1482 # when the case described above does occur. 1483 elif MtbEvent.is_new_contact(ev_dict) and finger_off: 1484 last_ev_time = ev_list[-1][MTB.EV_TIME] 1485 ev_list.append( 1486 MtbParser.make_syn_report_ev_dict(last_ev_time)) 1487 packets.append(ev_list) 1488 ev_list = [] 1489 1490 ev_list.append(ev_dict) 1491 if self._is_SYN_REPORT(ev_dict): 1492 packets.append(ev_list) 1493 ev_list = [] 1494 finger_off = False 1495 1496 elif start_flag: 1497 logging.warning(' Warn: format problem in event:\n %s' % line) 1498 return packets 1499 1500 def parse_file(self, file_name): 1501 """Parse raw device events in the given file name.""" 1502 packets = None 1503 if os.path.isfile(file_name): 1504 with open(file_name) as f: 1505 packets = self.parse(f) 1506 return packets 1507 1508 1509 if __name__ == '__main__': 1510 # Read a device file, and convert it to pretty packet format. 1511 if len(sys.argv) != 2 or not os.path.exists(sys.argv[1]): 1512 print 'Usage: %s device_file' % sys.argv[0] 1513 exit(1) 1514 1515 with open(sys.argv[1]) as event_file: 1516 packets = MtbParser().parse(event_file) 1517 for packet in packets: 1518 print make_pretty_packet(packet) 1519