Home | History | Annotate | Download | only in test
      1 ########################
      2 % Pipetool related tests
      3 ########################
      4 
      5 + Basic tests
      6 
      7 = Test default test case
      8 
      9 s = PeriodicSource("hello", 1, name="src")
     10 d1 = Drain(name="d1")
     11 c = ConsoleSink(name="c")
     12 tf = TransformDrain(lambda x: "Got %s" % x)
     13 t = TermSink(name="PipeToolsPeriodicTest", keepterm=False)
     14 s > d1 > c
     15 d1 > tf > t
     16 
     17 p = PipeEngine(s)
     18 p.start()
     19 time.sleep(3)
     20 s.msg = []
     21 p.stop()
     22 
     23 try:
     24     os.remove("test.png")
     25 except OSError:
     26     pass
     27 
     28 = Test add_pipe
     29 
     30 s = AutoSource()
     31 p = PipeEngine(s)
     32 p.add(Pipe())
     33 assert len(p.active_pipes) == 2
     34 
     35 x = p.spawn_Pipe()
     36 assert len(p.active_pipes) == 3
     37 assert isinstance(x, Pipe)
     38 
     39 = Test exhausted source
     40 
     41 s = AutoSource()
     42 s._gen_data("hello")
     43 s.is_exhausted = True
     44 d1 = Drain(name="d1")
     45 c = ConsoleSink(name="c")
     46 s > d1 > c
     47 
     48 p = PipeEngine(s)
     49 p.start()
     50 p.wait_and_stop()
     51 
     52 = Test add_pipe on running instance
     53 
     54 p = PipeEngine()
     55 p.start()
     56 
     57 s = CLIFeeder()
     58 
     59 d1 = Drain(name="d1")
     60 c = QueueSink(name="c")
     61 s > d1 > c
     62 
     63 p.add(s)
     64 
     65 s.send("hello")
     66 s.send("hi")
     67 
     68 assert c.q.get(timeout=5) == "hello"
     69 assert c.q.get(timeout=5) == "hi"
     70 
     71 p.stop()
     72 
     73 = Test Operators
     74 
     75 s = AutoSource()
     76 p = PipeEngine(s)
     77 assert p == p
     78 
     79 a = AutoSource()
     80 b = AutoSource()
     81 a >> b
     82 assert len(a.high_sinks) == 1
     83 assert len(a.high_sources) == 0
     84 assert len(b.high_sinks) == 0
     85 assert len(b.high_sources) == 1
     86 a
     87 b
     88 
     89 a = AutoSource()
     90 b = AutoSource()
     91 a << b
     92 assert len(a.high_sinks) == 0
     93 assert len(a.high_sources) == 1
     94 assert len(b.high_sinks) == 1
     95 assert len(b.high_sources) == 0
     96 a
     97 b
     98 
     99 a = AutoSource()
    100 b = AutoSource()
    101 a == b
    102 assert len(a.sinks) == 1
    103 assert len(a.sources) == 1
    104 assert len(b.sinks) == 1
    105 assert len(b.sources) == 1
    106 
    107 a = AutoSource()
    108 b = AutoSource()
    109 a//b
    110 assert len(a.high_sinks) == 1
    111 assert len(a.high_sources) == 1
    112 assert len(b.high_sinks) == 1
    113 assert len(b.high_sources) == 1
    114 
    115 a = AutoSource()
    116 b = AutoSource()
    117 a^b
    118 assert len(b.trigger_sources) == 1
    119 assert len(a.trigger_sinks) == 1
    120 
    121 = Test doc
    122 
    123 s = AutoSource()
    124 p = PipeEngine(s)
    125 p.list_pipes()
    126 p.list_pipes_detailed()
    127 
    128 = Test RawConsoleSink with CLIFeeder
    129 
    130 p = PipeEngine()
    131 
    132 s = CLIFeeder()
    133 s.send("hello")
    134 s.is_exhausted = True
    135 
    136 r, w = os.pipe()
    137 
    138 d1 = Drain(name="d1")
    139 c = RawConsoleSink(name="c")
    140 c._write_pipe = w
    141 s > d1 > c
    142 
    143 p.add(s)
    144 p.start()
    145 
    146 assert os.read(r, 20) == b"hello\n"
    147 p.wait_and_stop()
    148 
    149 = Test QueueSink with CLIFeeder
    150 
    151 p = PipeEngine()
    152 
    153 s = CLIFeeder()
    154 s.send("hello")
    155 s.is_exhausted = True
    156 
    157 d1 = Drain(name="d1")
    158 c = QueueSink(name="c")
    159 s > d1 > c
    160 
    161 p.add(s)
    162 p.start()
    163 
    164 p.wait_and_stop()
    165 assert c.recv() == "hello"
    166 
    167 = Test UpDrain
    168 
    169 test_val = None
    170 
    171 class TestSink(Sink):
    172     def high_push(self, msg):
    173         global test_val
    174         test_val = msg
    175 
    176 p = PipeEngine()
    177 
    178 s = CLIFeeder()
    179 s.send("hello")
    180 s.is_exhausted = True
    181 
    182 d1 = UpDrain(name="d1")
    183 c = TestSink(name="c")
    184 s > d1
    185 d1 >> c
    186 
    187 p.add(s)
    188 p.start()
    189 
    190 p.wait_and_stop()
    191 assert test_val == "hello"
    192 
    193 = Test DownDrain
    194 
    195 test_val = None
    196 
    197 class TestSink(Sink):
    198     def push(self, msg):
    199         global test_val
    200         test_val = msg
    201 
    202 p = PipeEngine()
    203 
    204 s = CLIHighFeeder()
    205 s.send("hello")
    206 s.is_exhausted = True
    207 
    208 d1 = DownDrain(name="d1")
    209 c = TestSink(name="c")
    210 s >> d1
    211 d1 > c
    212 
    213 p.add(s)
    214 p.start()
    215 
    216 p.wait_and_stop()
    217 assert test_val == "hello"
    218 
    219 + Advanced ScapyPipes pipetools tests
    220 
    221 = Test SniffSource
    222 ~ netaccess
    223 
    224 p = PipeEngine()
    225 
    226 s = SniffSource()
    227 d1 = Drain(name="d1")
    228 c = QueueSink(name="c")
    229 s > d1 > c
    230 
    231 p.add(s)
    232 p.start()
    233 sniff(count=3)
    234 p.stop()
    235 assert c.q.get()
    236 
    237 = Test exhausted AutoSource and SniffSource
    238 
    239 import mock
    240 from scapy.error import Scapy_Exception
    241 
    242 def _fail():
    243     raise Scapy_Exception()
    244 
    245 a = AutoSource()
    246 a._send = mock.MagicMock(side_effect=_fail)
    247 a._wake_up()
    248 try:
    249     a.deliver()
    250 except:
    251     pass
    252 
    253 s = SniffSource()
    254 s.s = mock.MagicMock()
    255 s.s.recv = mock.MagicMock(side_effect=_fail)
    256 try:
    257     s.deliver()
    258 except:
    259     pass
    260 
    261 = Test RdpcapSource and WrpcapSink
    262 ~ needs_root
    263 
    264 req = Ether()/IP()/ICMP()
    265 rpy = Ether()/IP('E\x00\x00\x1c\x00\x00\x00\x004\x01\x1d\x04\xd8:\xd0\x83\xc0\xa8\x00w\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00')
    266 
    267 wrpcap("t.pcap", [req, rpy])
    268 
    269 p = PipeEngine()
    270 
    271 s = RdpcapSource("t.pcap")
    272 d1 = Drain(name="d1")
    273 c = WrpcapSink("t2.pcap", name="c")
    274 s > d1 > c
    275 p.add(s)
    276 p.start()
    277 p.wait_and_stop()
    278 
    279 results = rdpcap("t2.pcap")
    280 
    281 assert raw(results[0]) == raw(req)
    282 assert raw(results[1]) == raw(rpy)
    283 
    284 os.unlink("t.pcap")
    285 os.unlink("t2.pcap")
    286 
    287 = Test InjectSink and Inject3Sink
    288 ~ needs_root
    289 
    290 import mock
    291 
    292 a = IP(dst="192.168.0.1")/ICMP()
    293 msgs = []
    294 
    295 class FakeSocket(object):
    296     def __init__(self, *arg, **karg):
    297         pass
    298     def close(self):
    299         pass
    300     def send(self, msg):
    301         global msgs
    302         msgs.append(msg)
    303 
    304 @mock.patch("scapy.scapypipes.conf.L2socket", FakeSocket)
    305 @mock.patch("scapy.scapypipes.conf.L3socket", FakeSocket)
    306 def _inject_sink(i3):
    307     s = CLIFeeder()
    308     s.send(a)
    309     s.is_exhausted = True
    310     d1 = Drain(name="d1")
    311     c = Inject3Sink() if i3 else InjectSink()
    312     s > d1 > c
    313     p = PipeEngine(s)
    314     p.start()
    315     p.wait_and_stop()
    316 
    317 _inject_sink(False) # InjectSink
    318 _inject_sink(True) # Inject3Sink
    319 
    320 assert msgs == [a,a]
    321 
    322 = TriggerDrain and TriggeredValve with CLIFeeder
    323 
    324 s = CLIFeeder()
    325 d1 = TriggerDrain(lambda x:x=="trigger")
    326 d2 = TriggeredValve()
    327 c = QueueSink()
    328 
    329 s > d1 > d2 > c
    330 d1 ^ d2
    331 
    332 p = PipeEngine(s)
    333 p.start()
    334 
    335 s.send("hello")
    336 s.send("trigger")
    337 s.send("hello2")
    338 s.send("trigger")
    339 s.send("hello3")
    340 
    341 assert c.q.get(timeout=5) == "hello"
    342 assert c.q.get(timeout=5) == "trigger"
    343 assert c.q.get(timeout=5) == "hello3"
    344 
    345 p.stop()
    346 
    347 = TriggerDrain and TriggeredValve with CLIHighFeeder
    348 
    349 s = CLIHighFeeder()
    350 d1 = TriggerDrain(lambda x:x=="trigger")
    351 d2 = TriggeredValve()
    352 c = QueueSink()
    353 
    354 s >> d1
    355 d1 >> d2
    356 d2 >> c
    357 d1 ^ d2
    358 
    359 p = PipeEngine(s)
    360 p.start()
    361 
    362 s.send("hello")
    363 s.send("trigger")
    364 s.send("hello2")
    365 s.send("trigger")
    366 s.send("hello3")
    367 
    368 assert c.q.get(timeout=5) == "hello"
    369 assert c.q.get(timeout=5) == "trigger"
    370 assert c.q.get(timeout=5) == "hello3"
    371 
    372 p.stop()
    373 
    374 = TriggerDrain and TriggeredQueueingValve with CLIFeeder
    375 
    376 s = CLIFeeder()
    377 d1 = TriggerDrain(lambda x:x=="trigger")
    378 d2 = TriggeredValve()
    379 c = QueueSink()
    380 
    381 s > d1 > d2 > c
    382 d1 ^ d2
    383 
    384 p = PipeEngine(s)
    385 p.start()
    386 
    387 s.send("hello")
    388 s.send("trigger")
    389 s.send("hello2")
    390 s.send("trigger")
    391 s.send("hello3")
    392 
    393 assert c.q.get(timeout=5) == "hello"
    394 assert c.q.get(timeout=5) == "trigger"
    395 assert c.q.get(timeout=5) == "hello3"
    396 
    397 p.stop()
    398 
    399 = TriggerDrain and TriggeredSwitch with CLIFeeder on high channel
    400 
    401 s = CLIFeeder()
    402 d1 = TriggerDrain(lambda x:x=="trigger")
    403 d2 = TriggeredSwitch()
    404 c = QueueSink()
    405 
    406 s > d1 > d2
    407 d2 >> c
    408 d1 ^ d2
    409 
    410 p = PipeEngine(s)
    411 p.start()
    412 
    413 s.send("hello")
    414 s.send("trigger")
    415 s.send("hello2")
    416 s.send("trigger")
    417 s.send("hello3")
    418 
    419 assert c.q.get(timeout=5) == "trigger"
    420 assert c.q.get(timeout=5) == "hello2"
    421 
    422 p.stop()
    423 
    424 = TriggerDrain and TriggeredSwitch with CLIHighFeeder on low channel
    425 
    426 s = CLIHighFeeder()
    427 d1 = TriggerDrain(lambda x:x=="trigger")
    428 d2 = TriggeredSwitch()
    429 c = QueueSink()
    430 
    431 s >> d1
    432 d1 >> d2
    433 d2 > c
    434 d1 ^ d2
    435 
    436 p = PipeEngine(s)
    437 p.start()
    438 
    439 s.send("hello")
    440 s.send("trigger")
    441 s.send("hello2")
    442 s.send("trigger")
    443 s.send("hello3")
    444 
    445 assert c.q.get(timeout=5) == "hello"
    446 assert c.q.get(timeout=5) == "trigger"
    447 assert c.q.get(timeout=5) == "hello3"
    448 
    449 p.stop()
    450 
    451 = TriggerDrain and TriggeredMessage
    452 
    453 s = CLIFeeder()
    454 d1 = TriggerDrain(lambda x:x=="trigger")
    455 d2 = TriggeredMessage("hello")
    456 c = QueueSink()
    457 
    458 s > d1 > d2 > c
    459 d1 ^ d2
    460 
    461 p = PipeEngine(s)
    462 p.start()
    463 
    464 s.send("trigger")
    465 
    466 r = [c.q.get(timeout=5), c.q.get(timeout=5)]
    467 assert "hello" in r
    468 assert "trigger" in r
    469 
    470 p.stop()
    471 
    472 = TriggerDrain and TriggeredQueueingValve on low channel
    473 
    474 p = PipeEngine()
    475 
    476 s = CLIFeeder()
    477 r, w = os.pipe()
    478 
    479 d1 = TriggerDrain(lambda x:x=="trigger")
    480 d2 = TriggeredQueueingValve()
    481 c = QueueSink(name="c")
    482 s > d1 > d2 > c
    483 d1 ^ d2
    484 
    485 p.add(s)
    486 p.start()
    487 
    488 s.send("trigger")
    489 s.send("hello")
    490 s.send("trigger")
    491 assert c.q.get(timeout=3) == "trigger"
    492 assert c.q.get(timeout=3) in ['hello', 'trigger']
    493 assert c.q.get(timeout=3) in ['hello', 'trigger']
    494 assert d2.q.qsize() == 0
    495 
    496 p.stop()
    497 
    498 = TriggerDrain and TriggeredQueueingValve on high channel
    499 
    500 p = PipeEngine()
    501 
    502 s = CLIHighFeeder()
    503 r, w = os.pipe()
    504 
    505 d1 = TriggerDrain(lambda x:x=="trigger")
    506 d2 = TriggeredQueueingValve()
    507 c = QueueSink(name="c")
    508 s >> d1 >> d2 >> c
    509 d1 ^ d2
    510 
    511 p.add(s)
    512 p.start()
    513 
    514 s.send("trigger")
    515 s.send("hello")
    516 s.send("trigger")
    517 assert c.q.get(timeout=3) == "trigger"
    518 assert c.q.get(timeout=3) == "hello"
    519 assert d2.q.qsize() == 0
    520 
    521 p.stop()
    522 
    523 = UDPDrain
    524 
    525 p = PipeEngine()
    526 
    527 s = CLIFeeder()
    528 s2 = CLIHighFeeder()
    529 d1 = UDPDrain()
    530 c = QueueSink()
    531 
    532 s > d1 > c
    533 s2 >> d1 >> c
    534 
    535 p.add(s)
    536 p.add(s2)
    537 p.start()
    538 
    539 s.send(IP(src="127.0.0.1")/UDP()/DNS())
    540 s2.send(DNS())
    541 
    542 res = [c.q.get(timeout=2), c.q.get(timeout=2)]
    543 assert b'\x00\x00\x01\x00\x00\x00\x00\x00\x00\x00\x00\x00' in res
    544 res.remove(b'\x00\x00\x01\x00\x00\x00\x00\x00\x00\x00\x00\x00')
    545 assert DNS in res[0] and res[0][UDP].sport == 1234
    546 
    547 p.stop()
    548 
    549 = FDSourceSink on a Bunch object
    550 
    551 class Bunch:
    552     __init__ = lambda self, **kw: setattr(self, '__dict__', kw)
    553 
    554 fd = Bunch(write=lambda x: None, read=lambda: "hello", fileno=lambda: None)
    555 
    556 s = FDSourceSink(fd)
    557 d = Drain()
    558 c = QueueSink()
    559 s > d > c
    560 
    561 assert s.fileno() == None
    562 s.push("data")
    563 s.deliver()
    564 assert c.q.get(timeout=1) == "hello"
    565 
    566 = TCPConnectPipe networking test
    567 ~ networking needs_root
    568 
    569 p = PipeEngine()
    570 
    571 s = CLIFeeder()
    572 d1 = TCPConnectPipe(addr="www.google.fr", port=80)
    573 c = QueueSink()
    574 
    575 s > d1 > c
    576 
    577 p.add(s)
    578 p.start()
    579 
    580 s.send(b"GET http://www.google.fr/search?q=scapy&start=1&num=1\n")
    581 result = c.q.get(timeout=10)
    582 p.stop()
    583 
    584 assert result.startswith(b"HTTP/1.0 200 OK")
    585