1 ######################## 2 % Pipetool related tests 3 ######################## 4 5 + Basic tests 6 7 = Test default test case 8 9 s = PeriodicSource("hello", 1, name="src") 10 d1 = Drain(name="d1") 11 c = ConsoleSink(name="c") 12 tf = TransformDrain(lambda x: "Got %s" % x) 13 t = TermSink(name="PipeToolsPeriodicTest", keepterm=False) 14 s > d1 > c 15 d1 > tf > t 16 17 p = PipeEngine(s) 18 p.start() 19 time.sleep(3) 20 s.msg = [] 21 p.stop() 22 23 try: 24 os.remove("test.png") 25 except OSError: 26 pass 27 28 = Test add_pipe 29 30 s = AutoSource() 31 p = PipeEngine(s) 32 p.add(Pipe()) 33 assert len(p.active_pipes) == 2 34 35 x = p.spawn_Pipe() 36 assert len(p.active_pipes) == 3 37 assert isinstance(x, Pipe) 38 39 = Test exhausted source 40 41 s = AutoSource() 42 s._gen_data("hello") 43 s.is_exhausted = True 44 d1 = Drain(name="d1") 45 c = ConsoleSink(name="c") 46 s > d1 > c 47 48 p = PipeEngine(s) 49 p.start() 50 p.wait_and_stop() 51 52 = Test add_pipe on running instance 53 54 p = PipeEngine() 55 p.start() 56 57 s = CLIFeeder() 58 59 d1 = Drain(name="d1") 60 c = QueueSink(name="c") 61 s > d1 > c 62 63 p.add(s) 64 65 s.send("hello") 66 s.send("hi") 67 68 assert c.q.get(timeout=5) == "hello" 69 assert c.q.get(timeout=5) == "hi" 70 71 p.stop() 72 73 = Test Operators 74 75 s = AutoSource() 76 p = PipeEngine(s) 77 assert p == p 78 79 a = AutoSource() 80 b = AutoSource() 81 a >> b 82 assert len(a.high_sinks) == 1 83 assert len(a.high_sources) == 0 84 assert len(b.high_sinks) == 0 85 assert len(b.high_sources) == 1 86 a 87 b 88 89 a = AutoSource() 90 b = AutoSource() 91 a << b 92 assert len(a.high_sinks) == 0 93 assert len(a.high_sources) == 1 94 assert len(b.high_sinks) == 1 95 assert len(b.high_sources) == 0 96 a 97 b 98 99 a = AutoSource() 100 b = AutoSource() 101 a == b 102 assert len(a.sinks) == 1 103 assert len(a.sources) == 1 104 assert len(b.sinks) == 1 105 assert len(b.sources) == 1 106 107 a = AutoSource() 108 b = AutoSource() 109 a//b 110 assert len(a.high_sinks) == 1 111 assert len(a.high_sources) == 1 112 assert len(b.high_sinks) == 1 113 assert len(b.high_sources) == 1 114 115 a = AutoSource() 116 b = AutoSource() 117 a^b 118 assert len(b.trigger_sources) == 1 119 assert len(a.trigger_sinks) == 1 120 121 = Test doc 122 123 s = AutoSource() 124 p = PipeEngine(s) 125 p.list_pipes() 126 p.list_pipes_detailed() 127 128 = Test RawConsoleSink with CLIFeeder 129 130 p = PipeEngine() 131 132 s = CLIFeeder() 133 s.send("hello") 134 s.is_exhausted = True 135 136 r, w = os.pipe() 137 138 d1 = Drain(name="d1") 139 c = RawConsoleSink(name="c") 140 c._write_pipe = w 141 s > d1 > c 142 143 p.add(s) 144 p.start() 145 146 assert os.read(r, 20) == b"hello\n" 147 p.wait_and_stop() 148 149 = Test QueueSink with CLIFeeder 150 151 p = PipeEngine() 152 153 s = CLIFeeder() 154 s.send("hello") 155 s.is_exhausted = True 156 157 d1 = Drain(name="d1") 158 c = QueueSink(name="c") 159 s > d1 > c 160 161 p.add(s) 162 p.start() 163 164 p.wait_and_stop() 165 assert c.recv() == "hello" 166 167 = Test UpDrain 168 169 test_val = None 170 171 class TestSink(Sink): 172 def high_push(self, msg): 173 global test_val 174 test_val = msg 175 176 p = PipeEngine() 177 178 s = CLIFeeder() 179 s.send("hello") 180 s.is_exhausted = True 181 182 d1 = UpDrain(name="d1") 183 c = TestSink(name="c") 184 s > d1 185 d1 >> c 186 187 p.add(s) 188 p.start() 189 190 p.wait_and_stop() 191 assert test_val == "hello" 192 193 = Test DownDrain 194 195 test_val = None 196 197 class TestSink(Sink): 198 def push(self, msg): 199 global test_val 200 test_val = msg 201 202 p = PipeEngine() 203 204 s = CLIHighFeeder() 205 s.send("hello") 206 s.is_exhausted = True 207 208 d1 = DownDrain(name="d1") 209 c = TestSink(name="c") 210 s >> d1 211 d1 > c 212 213 p.add(s) 214 p.start() 215 216 p.wait_and_stop() 217 assert test_val == "hello" 218 219 + Advanced ScapyPipes pipetools tests 220 221 = Test SniffSource 222 ~ netaccess 223 224 p = PipeEngine() 225 226 s = SniffSource() 227 d1 = Drain(name="d1") 228 c = QueueSink(name="c") 229 s > d1 > c 230 231 p.add(s) 232 p.start() 233 sniff(count=3) 234 p.stop() 235 assert c.q.get() 236 237 = Test exhausted AutoSource and SniffSource 238 239 import mock 240 from scapy.error import Scapy_Exception 241 242 def _fail(): 243 raise Scapy_Exception() 244 245 a = AutoSource() 246 a._send = mock.MagicMock(side_effect=_fail) 247 a._wake_up() 248 try: 249 a.deliver() 250 except: 251 pass 252 253 s = SniffSource() 254 s.s = mock.MagicMock() 255 s.s.recv = mock.MagicMock(side_effect=_fail) 256 try: 257 s.deliver() 258 except: 259 pass 260 261 = Test RdpcapSource and WrpcapSink 262 ~ needs_root 263 264 req = Ether()/IP()/ICMP() 265 rpy = Ether()/IP('E\x00\x00\x1c\x00\x00\x00\x004\x01\x1d\x04\xd8:\xd0\x83\xc0\xa8\x00w\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00') 266 267 wrpcap("t.pcap", [req, rpy]) 268 269 p = PipeEngine() 270 271 s = RdpcapSource("t.pcap") 272 d1 = Drain(name="d1") 273 c = WrpcapSink("t2.pcap", name="c") 274 s > d1 > c 275 p.add(s) 276 p.start() 277 p.wait_and_stop() 278 279 results = rdpcap("t2.pcap") 280 281 assert raw(results[0]) == raw(req) 282 assert raw(results[1]) == raw(rpy) 283 284 os.unlink("t.pcap") 285 os.unlink("t2.pcap") 286 287 = Test InjectSink and Inject3Sink 288 ~ needs_root 289 290 import mock 291 292 a = IP(dst="192.168.0.1")/ICMP() 293 msgs = [] 294 295 class FakeSocket(object): 296 def __init__(self, *arg, **karg): 297 pass 298 def close(self): 299 pass 300 def send(self, msg): 301 global msgs 302 msgs.append(msg) 303 304 @mock.patch("scapy.scapypipes.conf.L2socket", FakeSocket) 305 @mock.patch("scapy.scapypipes.conf.L3socket", FakeSocket) 306 def _inject_sink(i3): 307 s = CLIFeeder() 308 s.send(a) 309 s.is_exhausted = True 310 d1 = Drain(name="d1") 311 c = Inject3Sink() if i3 else InjectSink() 312 s > d1 > c 313 p = PipeEngine(s) 314 p.start() 315 p.wait_and_stop() 316 317 _inject_sink(False) # InjectSink 318 _inject_sink(True) # Inject3Sink 319 320 assert msgs == [a,a] 321 322 = TriggerDrain and TriggeredValve with CLIFeeder 323 324 s = CLIFeeder() 325 d1 = TriggerDrain(lambda x:x=="trigger") 326 d2 = TriggeredValve() 327 c = QueueSink() 328 329 s > d1 > d2 > c 330 d1 ^ d2 331 332 p = PipeEngine(s) 333 p.start() 334 335 s.send("hello") 336 s.send("trigger") 337 s.send("hello2") 338 s.send("trigger") 339 s.send("hello3") 340 341 assert c.q.get(timeout=5) == "hello" 342 assert c.q.get(timeout=5) == "trigger" 343 assert c.q.get(timeout=5) == "hello3" 344 345 p.stop() 346 347 = TriggerDrain and TriggeredValve with CLIHighFeeder 348 349 s = CLIHighFeeder() 350 d1 = TriggerDrain(lambda x:x=="trigger") 351 d2 = TriggeredValve() 352 c = QueueSink() 353 354 s >> d1 355 d1 >> d2 356 d2 >> c 357 d1 ^ d2 358 359 p = PipeEngine(s) 360 p.start() 361 362 s.send("hello") 363 s.send("trigger") 364 s.send("hello2") 365 s.send("trigger") 366 s.send("hello3") 367 368 assert c.q.get(timeout=5) == "hello" 369 assert c.q.get(timeout=5) == "trigger" 370 assert c.q.get(timeout=5) == "hello3" 371 372 p.stop() 373 374 = TriggerDrain and TriggeredQueueingValve with CLIFeeder 375 376 s = CLIFeeder() 377 d1 = TriggerDrain(lambda x:x=="trigger") 378 d2 = TriggeredValve() 379 c = QueueSink() 380 381 s > d1 > d2 > c 382 d1 ^ d2 383 384 p = PipeEngine(s) 385 p.start() 386 387 s.send("hello") 388 s.send("trigger") 389 s.send("hello2") 390 s.send("trigger") 391 s.send("hello3") 392 393 assert c.q.get(timeout=5) == "hello" 394 assert c.q.get(timeout=5) == "trigger" 395 assert c.q.get(timeout=5) == "hello3" 396 397 p.stop() 398 399 = TriggerDrain and TriggeredSwitch with CLIFeeder on high channel 400 401 s = CLIFeeder() 402 d1 = TriggerDrain(lambda x:x=="trigger") 403 d2 = TriggeredSwitch() 404 c = QueueSink() 405 406 s > d1 > d2 407 d2 >> c 408 d1 ^ d2 409 410 p = PipeEngine(s) 411 p.start() 412 413 s.send("hello") 414 s.send("trigger") 415 s.send("hello2") 416 s.send("trigger") 417 s.send("hello3") 418 419 assert c.q.get(timeout=5) == "trigger" 420 assert c.q.get(timeout=5) == "hello2" 421 422 p.stop() 423 424 = TriggerDrain and TriggeredSwitch with CLIHighFeeder on low channel 425 426 s = CLIHighFeeder() 427 d1 = TriggerDrain(lambda x:x=="trigger") 428 d2 = TriggeredSwitch() 429 c = QueueSink() 430 431 s >> d1 432 d1 >> d2 433 d2 > c 434 d1 ^ d2 435 436 p = PipeEngine(s) 437 p.start() 438 439 s.send("hello") 440 s.send("trigger") 441 s.send("hello2") 442 s.send("trigger") 443 s.send("hello3") 444 445 assert c.q.get(timeout=5) == "hello" 446 assert c.q.get(timeout=5) == "trigger" 447 assert c.q.get(timeout=5) == "hello3" 448 449 p.stop() 450 451 = TriggerDrain and TriggeredMessage 452 453 s = CLIFeeder() 454 d1 = TriggerDrain(lambda x:x=="trigger") 455 d2 = TriggeredMessage("hello") 456 c = QueueSink() 457 458 s > d1 > d2 > c 459 d1 ^ d2 460 461 p = PipeEngine(s) 462 p.start() 463 464 s.send("trigger") 465 466 r = [c.q.get(timeout=5), c.q.get(timeout=5)] 467 assert "hello" in r 468 assert "trigger" in r 469 470 p.stop() 471 472 = TriggerDrain and TriggeredQueueingValve on low channel 473 474 p = PipeEngine() 475 476 s = CLIFeeder() 477 r, w = os.pipe() 478 479 d1 = TriggerDrain(lambda x:x=="trigger") 480 d2 = TriggeredQueueingValve() 481 c = QueueSink(name="c") 482 s > d1 > d2 > c 483 d1 ^ d2 484 485 p.add(s) 486 p.start() 487 488 s.send("trigger") 489 s.send("hello") 490 s.send("trigger") 491 assert c.q.get(timeout=3) == "trigger" 492 assert c.q.get(timeout=3) in ['hello', 'trigger'] 493 assert c.q.get(timeout=3) in ['hello', 'trigger'] 494 assert d2.q.qsize() == 0 495 496 p.stop() 497 498 = TriggerDrain and TriggeredQueueingValve on high channel 499 500 p = PipeEngine() 501 502 s = CLIHighFeeder() 503 r, w = os.pipe() 504 505 d1 = TriggerDrain(lambda x:x=="trigger") 506 d2 = TriggeredQueueingValve() 507 c = QueueSink(name="c") 508 s >> d1 >> d2 >> c 509 d1 ^ d2 510 511 p.add(s) 512 p.start() 513 514 s.send("trigger") 515 s.send("hello") 516 s.send("trigger") 517 assert c.q.get(timeout=3) == "trigger" 518 assert c.q.get(timeout=3) == "hello" 519 assert d2.q.qsize() == 0 520 521 p.stop() 522 523 = UDPDrain 524 525 p = PipeEngine() 526 527 s = CLIFeeder() 528 s2 = CLIHighFeeder() 529 d1 = UDPDrain() 530 c = QueueSink() 531 532 s > d1 > c 533 s2 >> d1 >> c 534 535 p.add(s) 536 p.add(s2) 537 p.start() 538 539 s.send(IP(src="127.0.0.1")/UDP()/DNS()) 540 s2.send(DNS()) 541 542 res = [c.q.get(timeout=2), c.q.get(timeout=2)] 543 assert b'\x00\x00\x01\x00\x00\x00\x00\x00\x00\x00\x00\x00' in res 544 res.remove(b'\x00\x00\x01\x00\x00\x00\x00\x00\x00\x00\x00\x00') 545 assert DNS in res[0] and res[0][UDP].sport == 1234 546 547 p.stop() 548 549 = FDSourceSink on a Bunch object 550 551 class Bunch: 552 __init__ = lambda self, **kw: setattr(self, '__dict__', kw) 553 554 fd = Bunch(write=lambda x: None, read=lambda: "hello", fileno=lambda: None) 555 556 s = FDSourceSink(fd) 557 d = Drain() 558 c = QueueSink() 559 s > d > c 560 561 assert s.fileno() == None 562 s.push("data") 563 s.deliver() 564 assert c.q.get(timeout=1) == "hello" 565 566 = TCPConnectPipe networking test 567 ~ networking needs_root 568 569 p = PipeEngine() 570 571 s = CLIFeeder() 572 d1 = TCPConnectPipe(addr="www.google.fr", port=80) 573 c = QueueSink() 574 575 s > d1 > c 576 577 p.add(s) 578 p.start() 579 580 s.send(b"GET http://www.google.fr/search?q=scapy&start=1&num=1\n") 581 result = c.q.get(timeout=10) 582 p.stop() 583 584 assert result.startswith(b"HTTP/1.0 200 OK") 585