Home | History | Annotate | Download | only in scapy
      1 #! /usr/bin/env python
      2 
      3 ## This file is part of Scapy
      4 ## See http://www.secdev.org/projects/scapy for more informations
      5 ## Copyright (C) Philippe Biondi <phil (at] secdev.org>
      6 ## This program is published under a GPLv2 license
      7 
      8 from __future__ import print_function
      9 import os
     10 import subprocess
     11 import itertools
     12 import collections
     13 import time
     14 import scapy.modules.six as six
     15 from threading import Lock, Thread
     16 import scapy.utils
     17 
     18 from scapy.automaton import Message, select_objects, SelectableObject
     19 from scapy.consts import WINDOWS
     20 from scapy.error import log_interactive, warning
     21 from scapy.config import conf
     22 from scapy.utils import get_temp_file, do_graph
     23 
     24 import scapy.arch
     25 
     26 class PipeEngine(SelectableObject):
     27     pipes = {}
     28     @classmethod
     29     def list_pipes(cls):
     30         for pn,pc in sorted(cls.pipes.items()):
     31             doc = pc.__doc__ or ""
     32             if doc:
     33                 doc = doc.splitlines()[0]
     34             print("%20s: %s" % (pn, doc))
     35     @classmethod
     36     def list_pipes_detailed(cls):
     37         for pn,pc in sorted(cls.pipes.items()):
     38             if pc.__doc__:
     39                 print("###### %s\n %s" % (pn ,pc.__doc__))
     40             else:
     41                 print("###### %s" % pn)
     42     
     43     def __init__(self, *pipes):
     44         self.active_pipes = set()
     45         self.active_sources = set()
     46         self.active_drains = set()
     47         self.active_sinks = set()
     48         self._add_pipes(*pipes)
     49         self.thread_lock = Lock()
     50         self.command_lock = Lock()
     51         self.__fd_queue = collections.deque()
     52         self.__fdr,self.__fdw = os.pipe()
     53         self.thread = None
     54     def __getattr__(self, attr):
     55         if attr.startswith("spawn_"):
     56             dname = attr[6:]
     57             if dname in self.pipes:
     58                 def f(*args, **kargs):
     59                     k = self.pipes[dname]
     60                     p = k(*args, **kargs)
     61                     self.add(p)
     62                     return p
     63                 return f
     64         raise AttributeError(attr)
     65 
     66     def check_recv(self):
     67         """As select.select is not available, we check if there
     68         is some data to read by using a list that stores pointers."""
     69         return len(self.__fd_queue) > 0
     70 
     71     def fileno(self):
     72         return self.__fdr
     73 
     74     def _read_cmd(self):
     75         os.read(self.__fdr,1)
     76         return self.__fd_queue.popleft()
     77 
     78     def _write_cmd(self, _cmd):
     79         self.__fd_queue.append(_cmd)
     80         os.write(self.__fdw, b"X")
     81         self.call_release()
     82 
     83     def add_one_pipe(self, pipe):
     84         self.active_pipes.add(pipe)
     85         if isinstance(pipe, Source):
     86             self.active_sources.add(pipe)
     87         if isinstance(pipe, Drain):
     88             self.active_drains.add(pipe)
     89         if isinstance(pipe, Sink):
     90             self.active_sinks.add(pipe)
     91 
     92     def get_pipe_list(self, pipe):
     93         def flatten(p, l):
     94             l.add(p)
     95             for q in p.sources|p.sinks|p.high_sources|p.high_sinks:
     96                 if q not in l:
     97                     flatten(q, l)
     98         pl = set()
     99         flatten(pipe, pl)
    100         return pl
    101 
    102     def _add_pipes(self, *pipes):
    103         pl = set()
    104         for p in pipes:
    105             pl |= self.get_pipe_list(p)
    106         pl -= self.active_pipes
    107         for q in pl:
    108             self.add_one_pipe(q)
    109         return pl
    110             
    111 
    112     def run(self):
    113         log_interactive.info("Pipe engine thread started.")
    114         try:
    115             for p in self.active_pipes:
    116                 p.start()
    117             sources = self.active_sources
    118             sources.add(self)
    119             exhausted = set([])
    120             RUN=True
    121             STOP_IF_EXHAUSTED = False
    122             while RUN and (not STOP_IF_EXHAUSTED or len(sources) > 1):
    123                 fds = select_objects(sources, 2)
    124                 for fd in fds:
    125                     if fd is self:
    126                         cmd = self._read_cmd()
    127                         if cmd == "X":
    128                             RUN=False
    129                             break
    130                         elif cmd == "B":
    131                             STOP_IF_EXHAUSTED = True
    132                         elif cmd == "A":
    133                             sources = self.active_sources-exhausted
    134                             sources.add(self)
    135                         else:
    136                             warning("Unknown internal pipe engine command: %r. Ignoring." % cmd)
    137                     elif fd in sources:
    138                         try:
    139                             fd.deliver()
    140                         except Exception as e:
    141                             log_interactive.exception("piping from %s failed: %s" % (fd.name, e))
    142                         else:
    143                             if fd.exhausted():
    144                                 exhausted.add(fd)
    145                                 sources.remove(fd)
    146         except KeyboardInterrupt:
    147             pass
    148         finally:
    149             try:
    150                 for p in self.active_pipes:
    151                     p.stop()
    152             finally:
    153                 self.thread_lock.release()
    154                 log_interactive.info("Pipe engine thread stopped.")
    155 
    156     def start(self):
    157         if self.thread_lock.acquire(0):
    158             _t = Thread(target=self.run)
    159             _t.setDaemon(True)
    160             _t.start()
    161             self.thread = _t
    162         else:
    163             warning("Pipe engine already running")
    164     def wait_and_stop(self):
    165         self.stop(_cmd="B")
    166     def stop(self, _cmd="X"):
    167         try:
    168             with self.command_lock:
    169                 if self.thread is not None:
    170                     self._write_cmd(_cmd)
    171                     self.thread.join()
    172                     try:
    173                         self.thread_lock.release()
    174                     except:
    175                         pass
    176                 else:
    177                     warning("Pipe engine thread not running")
    178         except KeyboardInterrupt:
    179             print("Interrupted by user.")
    180 
    181     def add(self, *pipes):
    182         pipes = self._add_pipes(*pipes)
    183         with self.command_lock:
    184             if self.thread is not None:
    185                 for p in pipes:
    186                     p.start()
    187                 self._write_cmd("A")
    188     
    189     def graph(self,**kargs):
    190         g=['digraph "pipe" {',"\tnode [shape=rectangle];",]
    191         for p in self.active_pipes:
    192             g.append('\t"%i" [label="%s"];' % (id(p), p.name))
    193         g.append("")
    194         g.append("\tedge [color=blue, arrowhead=vee];")
    195         for p in self.active_pipes:
    196             for q in p.sinks:
    197                 g.append('\t"%i" -> "%i";' % (id(p), id(q)))
    198         g.append("")
    199         g.append("\tedge [color=purple, arrowhead=veevee];")
    200         for p in self.active_pipes:
    201             for q in p.high_sinks:
    202                 g.append('\t"%i" -> "%i";' % (id(p), id(q)))
    203         g.append("")
    204         g.append("\tedge [color=red, arrowhead=diamond];")
    205         for p in self.active_pipes:
    206             for q in p.trigger_sinks:
    207                 g.append('\t"%i" -> "%i";' % (id(p), id(q)))
    208         g.append('}')
    209         graph = "\n".join(g)
    210         do_graph(graph, **kargs) 
    211 
    212 
    213 class _ConnectorLogic(object):
    214     def __init__(self):
    215         self.sources = set()
    216         self.sinks = set()
    217         self.high_sources = set()
    218         self.high_sinks = set()
    219         self.trigger_sources = set()
    220         self.trigger_sinks = set()
    221 
    222     def __lt__(self, other):
    223         other.sinks.add(self)
    224         self.sources.add(other)
    225         return other
    226     def __gt__(self, other):
    227         self.sinks.add(other)
    228         other.sources.add(self)
    229         return other
    230     def __eq__(self, other):
    231         self > other
    232         other > self
    233         return other
    234 
    235     def __lshift__(self, other):
    236         self.high_sources.add(other)
    237         other.high_sinks.add(self)
    238         return other
    239     def __rshift__(self, other):
    240         self.high_sinks.add(other)
    241         other.high_sources.add(self)
    242         return other
    243     def __floordiv__(self, other):
    244         self >> other
    245         other >> self
    246         return other
    247 
    248     def __xor__(self, other):
    249         self.trigger_sinks.add(other)
    250         other.trigger_sources.add(self)
    251         return other
    252 
    253     def __hash__(self):
    254         return object.__hash__(self)
    255 
    256 class _PipeMeta(type):
    257     def __new__(cls, name, bases, dct):
    258         c = type.__new__(cls, name, bases, dct)
    259         PipeEngine.pipes[name] = c
    260         return c
    261 
    262 class Pipe(six.with_metaclass(_PipeMeta, _ConnectorLogic)):
    263     def __init__(self, name=None):
    264         _ConnectorLogic.__init__(self)
    265         if name is None:
    266             name = "%s" % (self.__class__.__name__)
    267         self.name = name
    268     def _send(self, msg):
    269         for s in self.sinks:
    270             s.push(msg)
    271     def _high_send(self, msg):
    272         for s in self.high_sinks:
    273             s.high_push(msg)
    274     def _trigger(self, msg=None):
    275         for s in self.trigger_sinks:
    276             s.on_trigger(msg)
    277 
    278     def __repr__(self):
    279         ct = conf.color_theme
    280         s = "%s%s" % (ct.punct("<"), ct.layer_name(self.name))
    281         if self.sources or self.sinks:
    282             s+= " %s" % ct.punct("[")
    283             if self.sources:
    284                 s+="%s%s" %  (ct.punct(",").join(ct.field_name(s.name) for s in self.sources),
    285                               ct.field_value(">"))
    286             s += ct.layer_name("#")
    287             if self.sinks:
    288                 s+="%s%s" % (ct.field_value(">"),
    289                              ct.punct(",").join(ct.field_name(s.name) for s in self.sinks))
    290             s += ct.punct("]")
    291 
    292         if self.high_sources or self.high_sinks:
    293             s+= " %s" % ct.punct("[")
    294             if self.high_sources:
    295                 s+="%s%s" %  (ct.punct(",").join(ct.field_name(s.name) for s in self.high_sources),
    296                               ct.field_value(">>"))
    297             s += ct.layer_name("#")
    298             if self.high_sinks:
    299                 s+="%s%s" % (ct.field_value(">>"),
    300                              ct.punct(",").join(ct.field_name(s.name) for s in self.high_sinks))
    301             s += ct.punct("]")
    302 
    303         if self.trigger_sources or self.trigger_sinks:
    304             s+= " %s" % ct.punct("[")
    305             if self.trigger_sources:
    306                 s+="%s%s" %  (ct.punct(",").join(ct.field_name(s.name) for s in self.trigger_sources),
    307                               ct.field_value("^"))
    308             s += ct.layer_name("#")
    309             if self.trigger_sinks:
    310                 s+="%s%s" % (ct.field_value("^"),
    311                              ct.punct(",").join(ct.field_name(s.name) for s in self.trigger_sinks))
    312             s += ct.punct("]")
    313 
    314 
    315         s += ct.punct(">")
    316         return s
    317 
    318 class Source(Pipe, SelectableObject):
    319     def __init__(self, name=None):
    320         Pipe.__init__(self, name=name)
    321         self.is_exhausted = False
    322     def _read_message(self):
    323         return Message()
    324     def deliver(self):
    325         msg = self._read_message
    326         self._send(msg)
    327     def fileno(self):
    328         return None
    329     def check_recv(self):
    330         return False
    331     def exhausted(self):
    332         return self.is_exhausted
    333     def start(self):
    334         pass
    335     def stop(self):
    336         pass
    337 
    338 class Drain(Pipe):
    339     """Repeat messages from low/high entries to (resp.) low/high exits
    340      +-------+
    341   >>-|-------|->>
    342      |       |
    343    >-|-------|->
    344      +-------+
    345 """
    346     def push(self, msg):
    347         self._send(msg)
    348     def high_push(self, msg):
    349         self._high_send(msg)
    350     def start(self):
    351         pass
    352     def stop(self):
    353         pass
    354 
    355 class Sink(Pipe):
    356     def push(self, msg):
    357         pass
    358     def high_push(self, msg):
    359         pass
    360     def start(self):
    361         pass
    362     def stop(self):
    363         pass
    364 
    365 
    366 class AutoSource(Source, SelectableObject):
    367     def __init__(self, name=None):
    368         Source.__init__(self, name=name)
    369         self.__fdr,self.__fdw = os.pipe()
    370         self._queue = collections.deque()
    371     def fileno(self):
    372         return self.__fdr
    373     def check_recv(self):
    374         return len(self._queue) > 0
    375     def _gen_data(self, msg):
    376         self._queue.append((msg,False))
    377         self._wake_up()
    378     def _gen_high_data(self, msg):
    379         self._queue.append((msg,True))
    380         self._wake_up()
    381     def _wake_up(self):
    382         os.write(self.__fdw, b"X")
    383         self.call_release()
    384     def deliver(self):
    385         os.read(self.__fdr,1)
    386         try:
    387             msg,high = self._queue.popleft()
    388         except IndexError: #empty queue. Exhausted source
    389             pass
    390         else:
    391             if high:
    392                 self._high_send(msg)
    393             else:
    394                 self._send(msg)
    395 
    396 class ThreadGenSource(AutoSource):
    397     def __init__(self, name=None):
    398         AutoSource.__init__(self, name=name)
    399         self.RUN = False
    400     def generate(self):
    401         pass
    402     def start(self):
    403         self.RUN = True
    404         Thread(target=self.generate).start()
    405     def stop(self):
    406         self.RUN = False
    407 
    408 
    409         
    410 class ConsoleSink(Sink):
    411     """Print messages on low and high entries
    412      +-------+
    413   >>-|--.    |->>
    414      | print |
    415    >-|--'    |->
    416      +-------+
    417 """
    418     def push(self, msg):
    419         print(">%r" % msg)
    420     def high_push(self, msg):
    421         print(">>%r" % msg)
    422 
    423 class RawConsoleSink(Sink):
    424     """Print messages on low and high entries
    425      +-------+
    426   >>-|--.    |->>
    427      | write |
    428    >-|--'    |->
    429      +-------+
    430 """
    431     def __init__(self, name=None, newlines=True):
    432         Sink.__init__(self, name=name)
    433         self.newlines = newlines
    434         self._write_pipe = 1
    435     def push(self, msg):
    436         if self.newlines:
    437             msg += "\n"
    438         os.write(self._write_pipe, msg.encode("utf8"))
    439     def high_push(self, msg):
    440         if self.newlines:
    441             msg += "\n"
    442         os.write(self._write_pipe, msg.encode("utf8"))
    443 
    444 class CLIFeeder(AutoSource):
    445     """Send messages from python command line
    446      +--------+
    447   >>-|        |->>
    448      | send() |
    449    >-|   `----|->
    450      +--------+
    451 """
    452     def send(self, msg):
    453         self._gen_data(msg)
    454     def close(self):
    455         self.is_exhausted = True
    456 
    457 class CLIHighFeeder(CLIFeeder):
    458     """Send messages from python command line to high output
    459      +--------+
    460   >>-|   .----|->>
    461      | send() |
    462    >-|        |->
    463      +--------+
    464 """
    465     def send(self, msg):
    466         self._gen_high_data(msg)
    467 
    468 
    469 class PeriodicSource(ThreadGenSource):
    470     """Generage messages periodically on low exit
    471      +-------+
    472   >>-|       |->>
    473      | msg,T |
    474    >-|  `----|->
    475      +-------+
    476 """
    477     def __init__(self, msg, period, period2=0, name=None):
    478         ThreadGenSource.__init__(self,name=name)
    479         if not isinstance(msg, (list, set, tuple)):
    480             msg=[msg]
    481         self.msg = msg
    482         self.period = period
    483         self.period2 = period2
    484     def generate(self):
    485         while self.RUN:
    486             empty_gen = True
    487             for m in self.msg:
    488                 empty_gen = False
    489                 self._gen_data(m)
    490                 time.sleep(self.period)
    491             if empty_gen:
    492                 self.is_exhausted = True
    493                 self._wake_up()
    494             time.sleep(self.period2)
    495         
    496 class TermSink(Sink):
    497     """Print messages on low and high entries on a separate terminal
    498      +-------+
    499   >>-|--.    |->>
    500      | print |
    501    >-|--'    |->
    502      +-------+
    503 """
    504     def __init__(self, name=None, keepterm=True, newlines=True, openearly=True):
    505         Sink.__init__(self, name=name)
    506         self.keepterm = keepterm
    507         self.newlines = newlines
    508         self.openearly = openearly
    509         self.opened = False
    510         if self.openearly:
    511             self.start()
    512     def _start_windows(self):
    513         if not self.opened:
    514             self.opened = True
    515             self.__f = get_temp_file()
    516             open(self.__f, "a").close()
    517             self.name = "Scapy" if self.name is None else self.name
    518             # Start a powershell in a new window and print the PID
    519             cmd = "$app = Start-Process PowerShell -ArgumentList '-command &{$host.ui.RawUI.WindowTitle=\\\"%s\\\";Get-Content \\\"%s\\\" -wait}' -passthru; echo $app.Id" % (self.name, self.__f.replace("\\", "\\\\"))
    520             proc = subprocess.Popen([conf.prog.powershell, cmd], stdout=subprocess.PIPE)
    521             output, _ = proc.communicate()
    522             # This is the process PID
    523             self.pid = int(output)
    524             print("PID: %d" % self.pid)
    525     def _start_unix(self):
    526         if not self.opened:
    527             self.opened = True
    528             rdesc, self.wdesc = os.pipe()
    529             cmd = ["xterm"]
    530             if self.name is not None:
    531                 cmd.extend(["-title",self.name])
    532             if self.keepterm:
    533                 cmd.append("-hold")
    534             cmd.extend(["-e", "cat <&%d" % rdesc])
    535             self.proc = subprocess.Popen(cmd, close_fds=False)
    536             os.close(rdesc)
    537     def start(self):
    538         if WINDOWS:
    539             return self._start_windows()
    540         else:
    541             return self._start_unix()
    542     def _stop_windows(self):
    543         if not self.keepterm:
    544             self.opened = False
    545             # Recipe to kill process with PID
    546             # http://code.activestate.com/recipes/347462-terminating-a-subprocess-on-windows/
    547             import ctypes
    548             PROCESS_TERMINATE = 1
    549             handle = ctypes.windll.kernel32.OpenProcess(PROCESS_TERMINATE, False, self.pid)
    550             ctypes.windll.kernel32.TerminateProcess(handle, -1)
    551             ctypes.windll.kernel32.CloseHandle(handle)
    552     def _stop_unix(self):
    553         if not self.keepterm:
    554             self.opened = False
    555             self.proc.kill()
    556             self.proc.wait()
    557     def stop(self):
    558         if WINDOWS:
    559             return self._stop_windows()
    560         else:
    561             return self._stop_unix()
    562     def _print(self, s):
    563         if self.newlines:
    564             s+="\n"
    565         if WINDOWS:
    566             wdesc = open(self.__f, "a")
    567             wdesc.write(s)
    568             wdesc.close()
    569         else:
    570             os.write(self.wdesc, s.encode())
    571     def push(self, msg):
    572         self._print(str(msg))
    573     def high_push(self, msg):
    574         self._print(str(msg))
    575     
    576 
    577 class QueueSink(Sink):
    578     """Collect messages from high and low entries and queue them. Messages are unqueued with the .recv() method.
    579      +-------+
    580   >>-|--.    |->>
    581      | queue |
    582    >-|--'    |->
    583      +-------+
    584 """
    585     def __init__(self, name=None):
    586         Sink.__init__(self, name=name)
    587         self.q = six.moves.queue.Queue()
    588     def push(self, msg):
    589         self.q.put(msg)
    590     def high_push(self, msg):
    591         self.q.put(msg)
    592     def recv(self):
    593         while True:
    594             try:
    595                 return self.q.get(True, timeout=0.1)
    596             except six.moves.queue.Empty:
    597                 pass
    598 
    599 
    600 class TransformDrain(Drain):
    601     """Apply a function to messages on low and high entry
    602      +-------+
    603   >>-|--[f]--|->>
    604      |       |
    605    >-|--[f]--|->
    606      +-------+
    607 """
    608     def __init__(self, f, name=None):
    609         Drain.__init__(self, name=name)
    610         self.f = f
    611     def push(self, msg):
    612         self._send(self.f(msg))
    613     def high_push(self, msg):
    614         self._high_send(self.f(msg))
    615 
    616 class UpDrain(Drain):
    617     """Repeat messages from low entry to high exit
    618      +-------+
    619   >>-|    ,--|->>
    620      |   /   |
    621    >-|--'    |->
    622      +-------+
    623 """
    624     def push(self, msg):
    625         self._high_send(msg)
    626     def high_push(self, msg):
    627         pass
    628 
    629 class DownDrain(Drain):
    630     """Repeat messages from high entry to low exit
    631      +-------+
    632   >>-|--.    |->>
    633      |   \   |
    634    >-|    `--|->
    635      +-------+
    636 """
    637     def push(self, msg):
    638         pass
    639     def high_push(self, msg):
    640         self._send(msg)
    641