1 #!/usr/bin/env python 2 3 ## This file is part of Scapy 4 ## This program is published under a GPLv2 license 5 6 """ 7 TLS client used in unit tests. 8 9 Start our TLS client, send our send_data, and terminate session with an Alert. 10 Optional cipher_cuite_code and version may be provided as hexadecimal strings 11 (e.g. c09e for TLS_DHE_RSA_WITH_AES_128_CCM and 0303 for TLS 1.2). 12 Reception of the exact send_data on the server is to be checked externally. 13 """ 14 15 import sys, os, time 16 import multiprocessing 17 18 basedir = os.path.abspath(os.path.join(os.path.dirname(__file__),"../../")) 19 sys.path=[basedir]+sys.path 20 21 from scapy.layers.tls.automaton_cli import TLSClientAutomaton 22 from scapy.layers.tls.handshake import TLSClientHello 23 24 25 send_data = cipher_suite_code = version = None 26 27 def run_tls_test_client(send_data=None, cipher_suite_code=None, version=None): 28 if version == "0002": 29 t = TLSClientAutomaton(data=[send_data, b"stop_server", b"quit"], version="sslv2") 30 else: 31 ch = TLSClientHello(version=int(version, 16), ciphers=int(cipher_suite_code, 16)) 32 t = TLSClientAutomaton(client_hello=ch, data=[send_data, b"stop_server", b"quit"], debug=1) 33 t.run() 34 35 from travis_test_server import run_tls_test_server 36 37 def test_tls_client(suite, version, q): 38 msg = ("TestC_%s_data" % suite).encode() 39 # Run server 40 q_ = multiprocessing.Manager().Queue() 41 th_ = multiprocessing.Process(target=run_tls_test_server, args=(msg, q_)) 42 th_.start() 43 # Synchronise threads 44 assert q_.get() is True 45 time.sleep(1) 46 # Run client 47 run_tls_test_client(msg, suite, version) 48 # Wait for server 49 th_.join(60) 50 if th_.is_alive(): 51 th_.terminate() 52 raise RuntimeError("Test timed out") 53 # Return values 54 q.put(q_.get()) 55 q.put(th_.exitcode) 56