1 #! /usr/bin/env python 2 3 ## This file is part of Scapy 4 ## See http://www.secdev.org/projects/scapy for more informations 5 ## Copyright (C) Philippe Biondi <phil (at] secdev.org> 6 ## This program is published under a GPLv2 license 7 8 from __future__ import print_function 9 import os 10 import subprocess 11 import itertools 12 import collections 13 import time 14 import scapy.modules.six as six 15 from threading import Lock, Thread 16 import scapy.utils 17 18 from scapy.automaton import Message, select_objects, SelectableObject 19 from scapy.consts import WINDOWS 20 from scapy.error import log_interactive, warning 21 from scapy.config import conf 22 from scapy.utils import get_temp_file, do_graph 23 24 import scapy.arch 25 26 class PipeEngine(SelectableObject): 27 pipes = {} 28 @classmethod 29 def list_pipes(cls): 30 for pn,pc in sorted(cls.pipes.items()): 31 doc = pc.__doc__ or "" 32 if doc: 33 doc = doc.splitlines()[0] 34 print("%20s: %s" % (pn, doc)) 35 @classmethod 36 def list_pipes_detailed(cls): 37 for pn,pc in sorted(cls.pipes.items()): 38 if pc.__doc__: 39 print("###### %s\n %s" % (pn ,pc.__doc__)) 40 else: 41 print("###### %s" % pn) 42 43 def __init__(self, *pipes): 44 self.active_pipes = set() 45 self.active_sources = set() 46 self.active_drains = set() 47 self.active_sinks = set() 48 self._add_pipes(*pipes) 49 self.thread_lock = Lock() 50 self.command_lock = Lock() 51 self.__fd_queue = collections.deque() 52 self.__fdr,self.__fdw = os.pipe() 53 self.thread = None 54 def __getattr__(self, attr): 55 if attr.startswith("spawn_"): 56 dname = attr[6:] 57 if dname in self.pipes: 58 def f(*args, **kargs): 59 k = self.pipes[dname] 60 p = k(*args, **kargs) 61 self.add(p) 62 return p 63 return f 64 raise AttributeError(attr) 65 66 def check_recv(self): 67 """As select.select is not available, we check if there 68 is some data to read by using a list that stores pointers.""" 69 return len(self.__fd_queue) > 0 70 71 def fileno(self): 72 return self.__fdr 73 74 def _read_cmd(self): 75 os.read(self.__fdr,1) 76 return self.__fd_queue.popleft() 77 78 def _write_cmd(self, _cmd): 79 self.__fd_queue.append(_cmd) 80 os.write(self.__fdw, b"X") 81 self.call_release() 82 83 def add_one_pipe(self, pipe): 84 self.active_pipes.add(pipe) 85 if isinstance(pipe, Source): 86 self.active_sources.add(pipe) 87 if isinstance(pipe, Drain): 88 self.active_drains.add(pipe) 89 if isinstance(pipe, Sink): 90 self.active_sinks.add(pipe) 91 92 def get_pipe_list(self, pipe): 93 def flatten(p, l): 94 l.add(p) 95 for q in p.sources|p.sinks|p.high_sources|p.high_sinks: 96 if q not in l: 97 flatten(q, l) 98 pl = set() 99 flatten(pipe, pl) 100 return pl 101 102 def _add_pipes(self, *pipes): 103 pl = set() 104 for p in pipes: 105 pl |= self.get_pipe_list(p) 106 pl -= self.active_pipes 107 for q in pl: 108 self.add_one_pipe(q) 109 return pl 110 111 112 def run(self): 113 log_interactive.info("Pipe engine thread started.") 114 try: 115 for p in self.active_pipes: 116 p.start() 117 sources = self.active_sources 118 sources.add(self) 119 exhausted = set([]) 120 RUN=True 121 STOP_IF_EXHAUSTED = False 122 while RUN and (not STOP_IF_EXHAUSTED or len(sources) > 1): 123 fds = select_objects(sources, 2) 124 for fd in fds: 125 if fd is self: 126 cmd = self._read_cmd() 127 if cmd == "X": 128 RUN=False 129 break 130 elif cmd == "B": 131 STOP_IF_EXHAUSTED = True 132 elif cmd == "A": 133 sources = self.active_sources-exhausted 134 sources.add(self) 135 else: 136 warning("Unknown internal pipe engine command: %r. Ignoring." % cmd) 137 elif fd in sources: 138 try: 139 fd.deliver() 140 except Exception as e: 141 log_interactive.exception("piping from %s failed: %s" % (fd.name, e)) 142 else: 143 if fd.exhausted(): 144 exhausted.add(fd) 145 sources.remove(fd) 146 except KeyboardInterrupt: 147 pass 148 finally: 149 try: 150 for p in self.active_pipes: 151 p.stop() 152 finally: 153 self.thread_lock.release() 154 log_interactive.info("Pipe engine thread stopped.") 155 156 def start(self): 157 if self.thread_lock.acquire(0): 158 _t = Thread(target=self.run) 159 _t.setDaemon(True) 160 _t.start() 161 self.thread = _t 162 else: 163 warning("Pipe engine already running") 164 def wait_and_stop(self): 165 self.stop(_cmd="B") 166 def stop(self, _cmd="X"): 167 try: 168 with self.command_lock: 169 if self.thread is not None: 170 self._write_cmd(_cmd) 171 self.thread.join() 172 try: 173 self.thread_lock.release() 174 except: 175 pass 176 else: 177 warning("Pipe engine thread not running") 178 except KeyboardInterrupt: 179 print("Interrupted by user.") 180 181 def add(self, *pipes): 182 pipes = self._add_pipes(*pipes) 183 with self.command_lock: 184 if self.thread is not None: 185 for p in pipes: 186 p.start() 187 self._write_cmd("A") 188 189 def graph(self,**kargs): 190 g=['digraph "pipe" {',"\tnode [shape=rectangle];",] 191 for p in self.active_pipes: 192 g.append('\t"%i" [label="%s"];' % (id(p), p.name)) 193 g.append("") 194 g.append("\tedge [color=blue, arrowhead=vee];") 195 for p in self.active_pipes: 196 for q in p.sinks: 197 g.append('\t"%i" -> "%i";' % (id(p), id(q))) 198 g.append("") 199 g.append("\tedge [color=purple, arrowhead=veevee];") 200 for p in self.active_pipes: 201 for q in p.high_sinks: 202 g.append('\t"%i" -> "%i";' % (id(p), id(q))) 203 g.append("") 204 g.append("\tedge [color=red, arrowhead=diamond];") 205 for p in self.active_pipes: 206 for q in p.trigger_sinks: 207 g.append('\t"%i" -> "%i";' % (id(p), id(q))) 208 g.append('}') 209 graph = "\n".join(g) 210 do_graph(graph, **kargs) 211 212 213 class _ConnectorLogic(object): 214 def __init__(self): 215 self.sources = set() 216 self.sinks = set() 217 self.high_sources = set() 218 self.high_sinks = set() 219 self.trigger_sources = set() 220 self.trigger_sinks = set() 221 222 def __lt__(self, other): 223 other.sinks.add(self) 224 self.sources.add(other) 225 return other 226 def __gt__(self, other): 227 self.sinks.add(other) 228 other.sources.add(self) 229 return other 230 def __eq__(self, other): 231 self > other 232 other > self 233 return other 234 235 def __lshift__(self, other): 236 self.high_sources.add(other) 237 other.high_sinks.add(self) 238 return other 239 def __rshift__(self, other): 240 self.high_sinks.add(other) 241 other.high_sources.add(self) 242 return other 243 def __floordiv__(self, other): 244 self >> other 245 other >> self 246 return other 247 248 def __xor__(self, other): 249 self.trigger_sinks.add(other) 250 other.trigger_sources.add(self) 251 return other 252 253 def __hash__(self): 254 return object.__hash__(self) 255 256 class _PipeMeta(type): 257 def __new__(cls, name, bases, dct): 258 c = type.__new__(cls, name, bases, dct) 259 PipeEngine.pipes[name] = c 260 return c 261 262 class Pipe(six.with_metaclass(_PipeMeta, _ConnectorLogic)): 263 def __init__(self, name=None): 264 _ConnectorLogic.__init__(self) 265 if name is None: 266 name = "%s" % (self.__class__.__name__) 267 self.name = name 268 def _send(self, msg): 269 for s in self.sinks: 270 s.push(msg) 271 def _high_send(self, msg): 272 for s in self.high_sinks: 273 s.high_push(msg) 274 def _trigger(self, msg=None): 275 for s in self.trigger_sinks: 276 s.on_trigger(msg) 277 278 def __repr__(self): 279 ct = conf.color_theme 280 s = "%s%s" % (ct.punct("<"), ct.layer_name(self.name)) 281 if self.sources or self.sinks: 282 s+= " %s" % ct.punct("[") 283 if self.sources: 284 s+="%s%s" % (ct.punct(",").join(ct.field_name(s.name) for s in self.sources), 285 ct.field_value(">")) 286 s += ct.layer_name("#") 287 if self.sinks: 288 s+="%s%s" % (ct.field_value(">"), 289 ct.punct(",").join(ct.field_name(s.name) for s in self.sinks)) 290 s += ct.punct("]") 291 292 if self.high_sources or self.high_sinks: 293 s+= " %s" % ct.punct("[") 294 if self.high_sources: 295 s+="%s%s" % (ct.punct(",").join(ct.field_name(s.name) for s in self.high_sources), 296 ct.field_value(">>")) 297 s += ct.layer_name("#") 298 if self.high_sinks: 299 s+="%s%s" % (ct.field_value(">>"), 300 ct.punct(",").join(ct.field_name(s.name) for s in self.high_sinks)) 301 s += ct.punct("]") 302 303 if self.trigger_sources or self.trigger_sinks: 304 s+= " %s" % ct.punct("[") 305 if self.trigger_sources: 306 s+="%s%s" % (ct.punct(",").join(ct.field_name(s.name) for s in self.trigger_sources), 307 ct.field_value("^")) 308 s += ct.layer_name("#") 309 if self.trigger_sinks: 310 s+="%s%s" % (ct.field_value("^"), 311 ct.punct(",").join(ct.field_name(s.name) for s in self.trigger_sinks)) 312 s += ct.punct("]") 313 314 315 s += ct.punct(">") 316 return s 317 318 class Source(Pipe, SelectableObject): 319 def __init__(self, name=None): 320 Pipe.__init__(self, name=name) 321 self.is_exhausted = False 322 def _read_message(self): 323 return Message() 324 def deliver(self): 325 msg = self._read_message 326 self._send(msg) 327 def fileno(self): 328 return None 329 def check_recv(self): 330 return False 331 def exhausted(self): 332 return self.is_exhausted 333 def start(self): 334 pass 335 def stop(self): 336 pass 337 338 class Drain(Pipe): 339 """Repeat messages from low/high entries to (resp.) low/high exits 340 +-------+ 341 >>-|-------|->> 342 | | 343 >-|-------|-> 344 +-------+ 345 """ 346 def push(self, msg): 347 self._send(msg) 348 def high_push(self, msg): 349 self._high_send(msg) 350 def start(self): 351 pass 352 def stop(self): 353 pass 354 355 class Sink(Pipe): 356 def push(self, msg): 357 pass 358 def high_push(self, msg): 359 pass 360 def start(self): 361 pass 362 def stop(self): 363 pass 364 365 366 class AutoSource(Source, SelectableObject): 367 def __init__(self, name=None): 368 Source.__init__(self, name=name) 369 self.__fdr,self.__fdw = os.pipe() 370 self._queue = collections.deque() 371 def fileno(self): 372 return self.__fdr 373 def check_recv(self): 374 return len(self._queue) > 0 375 def _gen_data(self, msg): 376 self._queue.append((msg,False)) 377 self._wake_up() 378 def _gen_high_data(self, msg): 379 self._queue.append((msg,True)) 380 self._wake_up() 381 def _wake_up(self): 382 os.write(self.__fdw, b"X") 383 self.call_release() 384 def deliver(self): 385 os.read(self.__fdr,1) 386 try: 387 msg,high = self._queue.popleft() 388 except IndexError: #empty queue. Exhausted source 389 pass 390 else: 391 if high: 392 self._high_send(msg) 393 else: 394 self._send(msg) 395 396 class ThreadGenSource(AutoSource): 397 def __init__(self, name=None): 398 AutoSource.__init__(self, name=name) 399 self.RUN = False 400 def generate(self): 401 pass 402 def start(self): 403 self.RUN = True 404 Thread(target=self.generate).start() 405 def stop(self): 406 self.RUN = False 407 408 409 410 class ConsoleSink(Sink): 411 """Print messages on low and high entries 412 +-------+ 413 >>-|--. |->> 414 | print | 415 >-|--' |-> 416 +-------+ 417 """ 418 def push(self, msg): 419 print(">%r" % msg) 420 def high_push(self, msg): 421 print(">>%r" % msg) 422 423 class RawConsoleSink(Sink): 424 """Print messages on low and high entries 425 +-------+ 426 >>-|--. |->> 427 | write | 428 >-|--' |-> 429 +-------+ 430 """ 431 def __init__(self, name=None, newlines=True): 432 Sink.__init__(self, name=name) 433 self.newlines = newlines 434 self._write_pipe = 1 435 def push(self, msg): 436 if self.newlines: 437 msg += "\n" 438 os.write(self._write_pipe, msg.encode("utf8")) 439 def high_push(self, msg): 440 if self.newlines: 441 msg += "\n" 442 os.write(self._write_pipe, msg.encode("utf8")) 443 444 class CLIFeeder(AutoSource): 445 """Send messages from python command line 446 +--------+ 447 >>-| |->> 448 | send() | 449 >-| `----|-> 450 +--------+ 451 """ 452 def send(self, msg): 453 self._gen_data(msg) 454 def close(self): 455 self.is_exhausted = True 456 457 class CLIHighFeeder(CLIFeeder): 458 """Send messages from python command line to high output 459 +--------+ 460 >>-| .----|->> 461 | send() | 462 >-| |-> 463 +--------+ 464 """ 465 def send(self, msg): 466 self._gen_high_data(msg) 467 468 469 class PeriodicSource(ThreadGenSource): 470 """Generage messages periodically on low exit 471 +-------+ 472 >>-| |->> 473 | msg,T | 474 >-| `----|-> 475 +-------+ 476 """ 477 def __init__(self, msg, period, period2=0, name=None): 478 ThreadGenSource.__init__(self,name=name) 479 if not isinstance(msg, (list, set, tuple)): 480 msg=[msg] 481 self.msg = msg 482 self.period = period 483 self.period2 = period2 484 def generate(self): 485 while self.RUN: 486 empty_gen = True 487 for m in self.msg: 488 empty_gen = False 489 self._gen_data(m) 490 time.sleep(self.period) 491 if empty_gen: 492 self.is_exhausted = True 493 self._wake_up() 494 time.sleep(self.period2) 495 496 class TermSink(Sink): 497 """Print messages on low and high entries on a separate terminal 498 +-------+ 499 >>-|--. |->> 500 | print | 501 >-|--' |-> 502 +-------+ 503 """ 504 def __init__(self, name=None, keepterm=True, newlines=True, openearly=True): 505 Sink.__init__(self, name=name) 506 self.keepterm = keepterm 507 self.newlines = newlines 508 self.openearly = openearly 509 self.opened = False 510 if self.openearly: 511 self.start() 512 def _start_windows(self): 513 if not self.opened: 514 self.opened = True 515 self.__f = get_temp_file() 516 open(self.__f, "a").close() 517 self.name = "Scapy" if self.name is None else self.name 518 # Start a powershell in a new window and print the PID 519 cmd = "$app = Start-Process PowerShell -ArgumentList '-command &{$host.ui.RawUI.WindowTitle=\\\"%s\\\";Get-Content \\\"%s\\\" -wait}' -passthru; echo $app.Id" % (self.name, self.__f.replace("\\", "\\\\")) 520 proc = subprocess.Popen([conf.prog.powershell, cmd], stdout=subprocess.PIPE) 521 output, _ = proc.communicate() 522 # This is the process PID 523 self.pid = int(output) 524 print("PID: %d" % self.pid) 525 def _start_unix(self): 526 if not self.opened: 527 self.opened = True 528 rdesc, self.wdesc = os.pipe() 529 cmd = ["xterm"] 530 if self.name is not None: 531 cmd.extend(["-title",self.name]) 532 if self.keepterm: 533 cmd.append("-hold") 534 cmd.extend(["-e", "cat <&%d" % rdesc]) 535 self.proc = subprocess.Popen(cmd, close_fds=False) 536 os.close(rdesc) 537 def start(self): 538 if WINDOWS: 539 return self._start_windows() 540 else: 541 return self._start_unix() 542 def _stop_windows(self): 543 if not self.keepterm: 544 self.opened = False 545 # Recipe to kill process with PID 546 # http://code.activestate.com/recipes/347462-terminating-a-subprocess-on-windows/ 547 import ctypes 548 PROCESS_TERMINATE = 1 549 handle = ctypes.windll.kernel32.OpenProcess(PROCESS_TERMINATE, False, self.pid) 550 ctypes.windll.kernel32.TerminateProcess(handle, -1) 551 ctypes.windll.kernel32.CloseHandle(handle) 552 def _stop_unix(self): 553 if not self.keepterm: 554 self.opened = False 555 self.proc.kill() 556 self.proc.wait() 557 def stop(self): 558 if WINDOWS: 559 return self._stop_windows() 560 else: 561 return self._stop_unix() 562 def _print(self, s): 563 if self.newlines: 564 s+="\n" 565 if WINDOWS: 566 wdesc = open(self.__f, "a") 567 wdesc.write(s) 568 wdesc.close() 569 else: 570 os.write(self.wdesc, s.encode()) 571 def push(self, msg): 572 self._print(str(msg)) 573 def high_push(self, msg): 574 self._print(str(msg)) 575 576 577 class QueueSink(Sink): 578 """Collect messages from high and low entries and queue them. Messages are unqueued with the .recv() method. 579 +-------+ 580 >>-|--. |->> 581 | queue | 582 >-|--' |-> 583 +-------+ 584 """ 585 def __init__(self, name=None): 586 Sink.__init__(self, name=name) 587 self.q = six.moves.queue.Queue() 588 def push(self, msg): 589 self.q.put(msg) 590 def high_push(self, msg): 591 self.q.put(msg) 592 def recv(self): 593 while True: 594 try: 595 return self.q.get(True, timeout=0.1) 596 except six.moves.queue.Empty: 597 pass 598 599 600 class TransformDrain(Drain): 601 """Apply a function to messages on low and high entry 602 +-------+ 603 >>-|--[f]--|->> 604 | | 605 >-|--[f]--|-> 606 +-------+ 607 """ 608 def __init__(self, f, name=None): 609 Drain.__init__(self, name=name) 610 self.f = f 611 def push(self, msg): 612 self._send(self.f(msg)) 613 def high_push(self, msg): 614 self._high_send(self.f(msg)) 615 616 class UpDrain(Drain): 617 """Repeat messages from low entry to high exit 618 +-------+ 619 >>-| ,--|->> 620 | / | 621 >-|--' |-> 622 +-------+ 623 """ 624 def push(self, msg): 625 self._high_send(msg) 626 def high_push(self, msg): 627 pass 628 629 class DownDrain(Drain): 630 """Repeat messages from high entry to low exit 631 +-------+ 632 >>-|--. |->> 633 | \ | 634 >-| `--|-> 635 +-------+ 636 """ 637 def push(self, msg): 638 pass 639 def high_push(self, msg): 640 self._send(msg) 641