Home | History | Annotate | Download | only in contrib
      1 # This file is part of Scapy
      2 ## See http://www.secdev.org/projects/scapy for more informations
      3 ## Copyright (C) Sabrina Dubroca <sd (at] queasysnail.net>
      4 ## This program is published under a GPLv2 license
      5 
      6 """
      7 Classes and functions for MACsec.
      8 """
      9 
     10 from __future__ import absolute_import
     11 from __future__ import print_function
     12 import struct
     13 
     14 from scapy.config import conf
     15 from scapy.fields import *
     16 from scapy.packet import Packet, Raw, bind_layers
     17 from scapy.layers.l2 import Ether, Dot1AD, Dot1Q
     18 from scapy.layers.eap import MACsecSCI
     19 from scapy.layers.inet import IP
     20 from scapy.layers.inet6 import IPv6
     21 import scapy.modules.six as six
     22 
     23 if conf.crypto_valid:
     24     from cryptography.exceptions import InvalidTag
     25     from cryptography.hazmat.backends import default_backend
     26     from cryptography.hazmat.primitives.ciphers import (
     27         Cipher,
     28         algorithms,
     29         modes,
     30     )
     31 else:
     32     log_loading.info("Can't import python-cryptography v1.7+. "
     33                      "Disabled MACsec encryption/authentication.")
     34 
     35 
     36 NOSCI_LEN = 14 + 6
     37 SCI_LEN = 8
     38 DEFAULT_ICV_LEN = 16
     39 
     40 
     41 class MACsecSA(object):
     42     """Representation of a MACsec Secure Association
     43 
     44     Provides encapsulation, decapsulation, encryption, and decryption
     45     of MACsec frames
     46     """
     47     def __init__(self, sci, an, pn, key, icvlen, encrypt, send_sci):
     48         if isinstance(sci, six.integer_types):
     49             self.sci = struct.pack('!Q', sci)
     50         elif isinstance(sci, bytes):
     51             self.sci = sci
     52         else:
     53             raise TypeError("SCI must be either bytes or int")
     54         self.an = an
     55         self.pn = pn
     56         self.key = key
     57         self.icvlen = icvlen
     58         self.do_encrypt = encrypt
     59         self.send_sci = send_sci
     60 
     61     def make_iv(self, pkt):
     62         """generate an IV for the packet"""
     63         return self.sci + struct.pack('!I', pkt[MACsec].pn)
     64 
     65     @staticmethod
     66     def split_pkt(pkt, assoclen, icvlen=0):
     67         """
     68         split the packet into associated data, plaintext or ciphertext, and
     69         optional ICV
     70         """
     71         data = raw(pkt)
     72         assoc = data[:assoclen]
     73         if icvlen:
     74             icv = data[-icvlen:]
     75             enc = data[assoclen:-icvlen]
     76         else:
     77             icv = b''
     78             enc = data[assoclen:]
     79         return assoc, enc, icv
     80 
     81     def e_bit(self):
     82         """returns the value of the E bit for packets sent through this SA"""
     83         return self.do_encrypt
     84 
     85     def c_bit(self):
     86         """returns the value of the C bit for packets sent through this SA"""
     87         return self.do_encrypt or self.icvlen != DEFAULT_ICV_LEN
     88 
     89     @staticmethod
     90     def shortlen(pkt):
     91         """determine shortlen for a raw packet (not encapsulated yet)"""
     92         datalen = len(pkt) - 2*6
     93         if datalen < 48:
     94             return datalen
     95         return 0
     96 
     97     def encap(self, pkt):
     98         """encapsulate a frame using this Secure Association"""
     99         if pkt.name != Ether().name:
    100             raise TypeError('cannot encapsulate packet in MACsec, must be Ethernet')
    101         hdr = copy.deepcopy(pkt)
    102         payload = hdr.payload
    103         del hdr.payload
    104         tag = MACsec(sci=self.sci, an=self.an,
    105                      SC=self.send_sci,
    106                      E=self.e_bit(), C=self.c_bit(),
    107                      shortlen=MACsecSA.shortlen(pkt),
    108                      pn=self.pn, type=pkt.type)
    109         hdr.type = ETH_P_MACSEC
    110         return hdr/tag/payload
    111 
    112     # this doesn't really need to be a method, but for symmetry with
    113     # encap(), it is
    114     def decap(self, orig_pkt):
    115         """decapsulate a MACsec frame"""
    116         if orig_pkt.name != Ether().name or orig_pkt.payload.name != MACsec().name:
    117             raise TypeError('cannot decapsulate MACsec packet, must be Ethernet/MACsec')
    118         packet = copy.deepcopy(orig_pkt)
    119         prev_layer = packet[MACsec].underlayer
    120         prev_layer.type = packet[MACsec].type
    121         next_layer = packet[MACsec].payload
    122         del prev_layer.payload
    123         if prev_layer.name == Ether().name:
    124             return Ether(raw(prev_layer/next_layer))
    125         return prev_layer/next_layer
    126 
    127     def encrypt(self, orig_pkt, assoclen=None):
    128         """encrypt a MACsec frame for this Secure Association"""
    129         hdr = copy.deepcopy(orig_pkt)
    130         del hdr[MACsec].payload
    131         del hdr[MACsec].type
    132         pktlen = len(orig_pkt)
    133         if self.send_sci:
    134             hdrlen = NOSCI_LEN + SCI_LEN
    135         else:
    136             hdrlen = NOSCI_LEN
    137         if assoclen is None or not self.do_encrypt:
    138             if self.do_encrypt:
    139                 assoclen = hdrlen
    140             else:
    141                 assoclen = pktlen
    142         iv = self.make_iv(orig_pkt)
    143         assoc, pt, _ = MACsecSA.split_pkt(orig_pkt, assoclen)
    144         encryptor = Cipher(
    145             algorithms.AES(self.key),
    146             modes.GCM(iv),
    147             backend=default_backend()
    148         ).encryptor()
    149         encryptor.authenticate_additional_data(assoc)
    150         ct = encryptor.update(pt) + encryptor.finalize()
    151         hdr[MACsec].payload = Raw(assoc[hdrlen:assoclen] + ct + encryptor.tag)
    152         return hdr
    153 
    154     def decrypt(self, orig_pkt, assoclen=None):
    155         """decrypt a MACsec frame for this Secure Association"""
    156         hdr = copy.deepcopy(orig_pkt)
    157         del hdr[MACsec].payload
    158         pktlen = len(orig_pkt)
    159         if self.send_sci:
    160             hdrlen = NOSCI_LEN + SCI_LEN
    161         else:
    162             hdrlen = NOSCI_LEN
    163         if assoclen is None or not self.do_encrypt:
    164             if self.do_encrypt:
    165                 assoclen = hdrlen
    166             else:
    167                 assoclen = pktlen - self.icvlen
    168         iv = self.make_iv(hdr)
    169         assoc, ct, icv = MACsecSA.split_pkt(orig_pkt, assoclen, self.icvlen)
    170         decryptor = Cipher(
    171                algorithms.AES(self.key),
    172                modes.GCM(iv, icv),
    173                backend=default_backend()
    174            ).decryptor()
    175         decryptor.authenticate_additional_data(assoc)
    176         pt = assoc[hdrlen:assoclen]
    177         pt += decryptor.update(ct)
    178         pt += decryptor.finalize()
    179         hdr[MACsec].type = struct.unpack('!H', pt[0:2])[0]
    180         hdr[MACsec].payload = Raw(pt[2:])
    181         return hdr
    182 
    183 
    184 class MACsec(Packet):
    185     """representation of one MACsec frame"""
    186     name = '802.1AE'
    187     fields_desc = [BitField('Ver', 0, 1),
    188                    BitField('ES', 0, 1),
    189                    BitField('SC', 0, 1),
    190                    BitField('SCB', 0, 1),
    191                    BitField('E', 0, 1),
    192                    BitField('C', 0, 1),
    193                    BitField('an', 0, 2),
    194                    BitField('reserved', 0, 2),
    195                    BitField('shortlen', 0, 6),
    196                    IntField("pn", 1),
    197                    ConditionalField(PacketField("sci", None, MACsecSCI), lambda pkt: pkt.SC),
    198                    ConditionalField(XShortEnumField("type", None, ETHER_TYPES),
    199                                     lambda pkt: pkt.type is not None)]
    200 
    201     def mysummary(self):
    202         summary = self.sprintf("an=%MACsec.an%, pn=%MACsec.pn%")
    203         if self.SC:
    204             summary += self.sprintf(", sci=%MACsec.sci%")
    205         if self.type is not None:
    206             summary += self.sprintf(", %MACsec.type%")
    207         return summary
    208 
    209 
    210 bind_layers(MACsec, IP, type=ETH_P_IP)
    211 bind_layers(MACsec, IPv6, type=ETH_P_IPV6)
    212 
    213 bind_layers( Dot1AD,        MACsec,        type=ETH_P_MACSEC)
    214 bind_layers( Dot1Q,         MACsec,        type=ETH_P_MACSEC)
    215 bind_layers( Ether,         MACsec,        type=ETH_P_MACSEC)
    216