Home | History | Annotate | Download | only in tls
      1 #!/usr/bin/env python
      2 
      3 ## This file is part of Scapy
      4 ## This program is published under a GPLv2 license
      5 
      6 """
      7 TLS client used in unit tests.
      8 
      9 Start our TLS client, send our send_data, and terminate session with an Alert.
     10 Optional cipher_cuite_code and version may be provided as hexadecimal strings
     11 (e.g. c09e for TLS_DHE_RSA_WITH_AES_128_CCM and 0303 for TLS 1.2).
     12 Reception of the exact send_data on the server is to be checked externally.
     13 """
     14 
     15 import sys, os, time
     16 import multiprocessing
     17 
     18 basedir = os.path.abspath(os.path.join(os.path.dirname(__file__),"../../"))
     19 sys.path=[basedir]+sys.path
     20 
     21 from scapy.layers.tls.automaton_cli import TLSClientAutomaton
     22 from scapy.layers.tls.handshake import TLSClientHello
     23 
     24 
     25 send_data = cipher_suite_code = version = None
     26 
     27 def run_tls_test_client(send_data=None, cipher_suite_code=None, version=None):
     28     if version == "0002":
     29         t = TLSClientAutomaton(data=[send_data, b"stop_server", b"quit"], version="sslv2")
     30     else:
     31         ch = TLSClientHello(version=int(version, 16), ciphers=int(cipher_suite_code, 16))
     32         t = TLSClientAutomaton(client_hello=ch, data=[send_data, b"stop_server", b"quit"], debug=1)
     33     t.run()
     34 
     35 from travis_test_server import run_tls_test_server
     36 
     37 def test_tls_client(suite, version, q):
     38     msg = ("TestC_%s_data" % suite).encode()
     39     # Run server
     40     q_ = multiprocessing.Manager().Queue()
     41     th_ = multiprocessing.Process(target=run_tls_test_server, args=(msg, q_))
     42     th_.start()
     43     # Synchronise threads
     44     assert q_.get() is True
     45     time.sleep(1)
     46     # Run client
     47     run_tls_test_client(msg, suite, version)
     48     # Wait for server
     49     th_.join(60)
     50     if th_.is_alive():
     51         th_.terminate()
     52         raise RuntimeError("Test timed out")
     53     # Return values
     54     q.put(q_.get())
     55     q.put(th_.exitcode)
     56