Home | History | Annotate | Download | only in contrib
      1 # This file is part of Scapy
      2 # Scapy is free software: you can redistribute it and/or modify
      3 # it under the terms of the GNU General Public License as published by
      4 # the Free Software Foundation, either version 2 of the License, or
      5 # any later version.
      6 #
      7 # Scapy is distributed in the hope that it will be useful,
      8 # but WITHOUT ANY WARRANTY; without even the implied warranty of
      9 # MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
     10 # GNU General Public License for more details.
     11 #
     12 # You should have received a copy of the GNU General Public License
     13 # along with Scapy. If not, see <http://www.gnu.org/licenses/>.
     14 
     15 # Copyright (C) 2016 Gauthier Sebaux
     16 
     17 # scapy.contrib.description = ProfinetIO Real-Time Cyclic (RTC)
     18 # scapy.contrib.status = loads
     19 
     20 """
     21 PROFINET IO layers for scapy which correspond to Real-Time Cyclic data
     22 """
     23 
     24 # external imports
     25 from __future__ import absolute_import
     26 import math
     27 import struct
     28 
     29 # Scapy imports
     30 from scapy.all import Packet, bind_layers, Ether, UDP, Field, conf
     31 from scapy.fields import BitEnumField, BitField, ByteField,\
     32         FlagsField,\
     33         PacketListField,\
     34         ShortField, StrFixedLenField,\
     35         XBitField, XByteField
     36 
     37 # local imports
     38 from scapy.contrib.pnio import ProfinetIO
     39 from scapy.compat import orb
     40 from scapy.modules.six.moves import range
     41 
     42 
     43 #####################################
     44 ## PROFINET Real-Time Data Packets ##
     45 #####################################
     46 
     47 class PNIORealTimeIOxS(Packet):
     48     """IOCS and IOPS packets for PROFINET Real-Time payload"""
     49     name = "PNIO RTC IOxS"
     50     fields_desc = [
     51         BitEnumField("dataState", 1, 1, ["bad", "good"]),
     52         BitEnumField("instance", 0, 2, ["subslot", "slot", "device", "controller"]),
     53         XBitField("reserved", 0, 4),
     54         BitField("extension", 0, 1),
     55         ]
     56 
     57     def extract_padding(self, s):
     58         return None, s      # No extra payload
     59 
     60 
     61 class PNIORealTimeRawData(Packet):
     62     """Raw data packets for PROFINET Real-Time payload.
     63 
     64     It's a configurable packet whose config only includes a fix length. The
     65     config parameter must then be a dict {"length": X}.
     66 
     67     PROFINET IO specification impose this packet to be followed with an IOPS
     68     (PNIORealTimeIOxS)"""
     69     __slots__ = ["_config"]
     70     name = "PNIO RTC Raw data"
     71     fields_desc = [
     72         StrFixedLenField("load", "", length_from=lambda p: p[PNIORealTimeRawData].length()),
     73         ]
     74 
     75     def __init__(self, _pkt="", post_transform=None, _internal=0, _underlayer=None, config=None, **fields):
     76         """
     77         length=None means that the length must be managed by the user. If it's
     78         defined, the field will always be length-long (padded with b"\\x00" if
     79         needed)
     80         """
     81         self._config = config
     82         Packet.__init__(self, _pkt=_pkt, post_transform=post_transform,
     83                         _internal=_internal, _underlayer=_underlayer, **fields)
     84 
     85     def copy(self):
     86         pkt = Packet.copy(self)
     87         pkt._config = self._config
     88         return pkt
     89 
     90     def clone_with(self, *args, **kargs):
     91         pkt = Packet.clone_with(self, *args, **kargs)
     92         pkt._config = self._config
     93         return pkt
     94 
     95     def length(self):
     96         """Get the length of the raw data"""
     97         # Manage the length of the packet if a length is provided
     98         return  self._config["length"]
     99 
    100 # Make sure an IOPS follows a data
    101 bind_layers(PNIORealTimeRawData, PNIORealTimeIOxS)
    102 
    103 
    104 
    105 ###############################
    106 ## PROFINET Real-Time Fields ##
    107 ###############################
    108 
    109 class LowerLayerBoundPacketListField(PacketListField):
    110     """PacketList which binds each underlayer of packets to the current pkt"""
    111     def m2i(self, pkt, m):
    112         return self.cls(m, _underlayer=pkt)
    113 
    114 class NotionalLenField(Field):
    115     """A len fields which isn't present in the machine representation, but is
    116     computed from a given lambda"""
    117     __slots__ = ["length_from", "count_from"]
    118     def __init__(self, name, default, length_from=None, count_from=None):
    119         Field.__init__(self, name, default)
    120         self.length_from = length_from
    121         self.count_from = count_from
    122 
    123     def addfield(self, pkt, s, val):
    124         return s   # Isn't present in the machine packet
    125 
    126     def getfield(self, pkt, s):
    127         val = None
    128         if self.length_from is not None:
    129             val = self.length_from(pkt, s)
    130         elif self.count_from is not None:
    131             val = self.count_from(pkt, s)
    132         return s, val
    133 
    134 
    135 ###############################
    136 ## PNIORealTime Configuration #
    137 ###############################
    138 
    139 # conf.contribs["PNIO_RTC"] is a dict which contains data layout for each Ethernet
    140 # communications. It must be formatted as such:
    141 # {(Ether.src, Ether.dst): [(start, type, config), ...]}
    142 # start: index of a data field from the END of the data buffer (-1, -2, ...)
    143 # type: class to be instanciated to represent these data
    144 # config: a config dict, given to the type class constructor
    145 conf.contribs["PNIO_RTC"] = {}
    146 
    147 def _get_ethernet(pkt):
    148     """Find the Ethernet packet of underlayer or None"""
    149     ether = pkt
    150     while ether is not None and not isinstance(ether, Ether):
    151         ether = ether.underlayer
    152     return ether
    153 
    154 def pnio_update_config(config):
    155     """Update the PNIO RTC config"""
    156     conf.contribs["PNIO_RTC"].update(config)
    157 
    158 def pnio_get_config(pkt):
    159     """Retrieve the config for a given communication"""
    160     # get the config based on the tuple (Ether.src, Ether.dst)
    161     ether = _get_ethernet(pkt)
    162     config = None
    163     if ether is not None and (ether.src, ether.dst) in conf.contribs["PNIO_RTC"]:
    164         config = conf.contribs["PNIO_RTC"][(ether.src, ether.dst)]
    165 
    166     return config
    167 
    168 
    169 ###############################
    170 ## PROFINET Real-Time Packet ##
    171 ###############################
    172 
    173 def _pnio_rtc_guess_payload_class(_pkt, _underlayer=None, *args, **kargs):
    174     """A dispatcher for the packet list field which manage the configuration
    175     to fin dthe appropriate class"""
    176     config = pnio_get_config(_underlayer)
    177 
    178     if isinstance(config, list):
    179         # If we have a valid config, it's a list which describe each data
    180         # packets the rest being IOCS
    181         cur_index = -len(_pkt)
    182         for index, cls, params in config:
    183             if cur_index == index:
    184                 return cls(_pkt, config=params, *args, **kargs)
    185 
    186         # Not a data => IOCS packet
    187         return PNIORealTimeIOxS(_pkt, *args, **kargs)
    188     else:
    189         # No config => Raw data which dissect the whole _pkt
    190         return PNIORealTimeRawData(_pkt,
    191                                    config={"length": len(_pkt)},
    192                                    *args, **kargs
    193                                   )
    194 
    195 
    196 _PNIO_DS_FLAGS = [
    197     "primary",
    198     "redundancy",
    199     "validData",
    200     "reserved_1",
    201     "run",
    202     "no_problem",
    203     "reserved_2",
    204     "ignore",
    205     ]
    206 class PNIORealTime(Packet):
    207     """PROFINET cyclic real-time"""
    208     name = "PROFINET Real-Time"
    209     fields_desc = [
    210         NotionalLenField("len", None, length_from=lambda p, s: len(s)),
    211         NotionalLenField("dataLen", None, length_from=lambda p, s: len(s[:-4].rstrip(b"\0"))),
    212         LowerLayerBoundPacketListField("data", [], _pnio_rtc_guess_payload_class, length_from=lambda p: p.dataLen),
    213         StrFixedLenField("padding", "", length_from=lambda p: p[PNIORealTime].padding_length()),
    214         ShortField("cycleCounter", 0),
    215         FlagsField("dataStatus", 0x35, 8, _PNIO_DS_FLAGS),
    216         ByteField("transferStatus", 0)
    217         ]
    218     overload_fields = {
    219         ProfinetIO: {"frameID": 0x8000},   # RT_CLASS_1
    220         }
    221 
    222     def padding_length(self):
    223         """Compute the length of the padding need for the ethernet frame"""
    224         fld, val = self.getfield_and_val("data")
    225 
    226         # use the len field if available to define the padding length, eg for
    227         # dissected packets
    228         pkt_len = self.getfieldval("len")
    229         if pkt_len is not None:
    230             return max(0, pkt_len - len(fld.addfield(self, b"", val)) - 4)
    231 
    232         if isinstance(self.underlayer, ProfinetIO) and \
    233                 isinstance(self.underlayer.underlayer, UDP):
    234             return max(0, 12 - len(fld.addfield(self, b"", val)))
    235         else:
    236             return max(0, 40 - len(fld.addfield(self, b"", val)))
    237 
    238     @staticmethod
    239     def analyse_data(packets):
    240         """Analyse the data to find heuristical properties and determine
    241         location and type of data"""
    242         loc = PNIORealTime.find_data(packets)
    243         loc = PNIORealTime.analyse_profisafe(packets, loc)
    244         pnio_update_config(loc)
    245         return loc
    246 
    247     @staticmethod
    248     def find_data(packets):
    249         """Analyse a packet list to extract data offsets from packets data."""
    250         # a dictionary to count data offsets (ie != 0x80)
    251         # It's formatted: {(src, dst): (total, [count for offset in len])}
    252         heuristic = {}
    253 
    254         # Counts possible data locations
    255         # 0x80 are mainly IOxS and trailling 0x00s are just padding
    256         for pkt in packets:
    257             if PNIORealTime in pkt:
    258                 pdu = bytes(pkt[PNIORealTime])[:-4].rstrip(b"\0")
    259 
    260                 if (pkt.src, pkt.dst) not in heuristic:
    261                     heuristic[(pkt.src, pkt.dst)] = (0, [])
    262 
    263                 total, counts = heuristic[(pkt.src, pkt.dst)]
    264 
    265                 if len(counts) < len(pdu):
    266                     counts.extend([0 for _ in range(len(pdu) - len(counts))])
    267 
    268                 for i in range(len(pdu)):
    269                     if orb(pdu[i]) != 0x80:
    270                         counts[i] += 1
    271 
    272                 comm = (pkt.src, pkt.dst)
    273                 heuristic[comm] = (total + 1, counts)
    274 
    275         # Determine data locations
    276         locations = {}
    277         for comm in heuristic:
    278             total, counts = heuristic[comm]
    279             length = len(counts)
    280             loc = locations[comm] = []
    281             start = None
    282             for i in range(length):
    283                 if counts[i] > total // 2:   # Data if more than half is != 0x80
    284                     if start is None:
    285                         start = i
    286                 else:
    287                     if start is not None:
    288                         loc.append((
    289                             start - length,
    290                             PNIORealTimeRawData,
    291                             {"length": i - start}
    292                             ))
    293                         start = None
    294 
    295         return locations
    296 
    297     @staticmethod
    298     def analyse_profisafe(packets, locations=None):
    299         """Analyse a packet list to find possible PROFISafe profils.
    300 
    301         It's based on an heuristical analysis of each payload to try to find
    302         CRC and control/status byte.
    303 
    304         locations: possible data locations. If not provided, analyse_pn_rt will
    305         be called beforehand. If not given, it calls in the same time
    306         analyse_data which update the configuration of the data field"""
    307         # get data locations and entropy of bytes
    308         if not locations:
    309             locations = PNIORealTime.find_data(packets)
    310         entropies = PNIORealTime.data_entropy(packets, locations)
    311 
    312         # Try to find at least 3 high entropy successive bytes (the CRC)
    313         for comm in entropies:
    314             entropy = dict(entropies[comm])  # Convert tuples to key => value
    315 
    316             for i in range(len(locations[comm])):
    317                 # update each location with its value after profisafe analysis
    318                 locations[comm][i] = \
    319                         PNIORealTime.analyse_one_profisafe_location(
    320                             locations[comm][i], entropy
    321                         )
    322 
    323         return locations
    324 
    325     @staticmethod
    326     def analyse_one_profisafe_location(location, entropy):
    327         """Analyse one PNIO RTC data location to find if its a PROFISafe
    328 
    329         :param location: location to analyse, a tuple (start, type, config)
    330         :param entropy: the entropy of each byte of the packet data
    331         :returns: the configuration associated with the data
    332         """
    333         start, klass, conf = location
    334         if conf["length"] >= 4:     # Minimal PROFISafe length
    335             succ_count = 0
    336             for j in range(start, start + conf["length"]):
    337                 # Limit for a CRC is set to 6 bit of entropy min
    338                 if j in entropy and entropy[j] >= 6:
    339                     succ_count += 1
    340                 else:
    341                     succ_count = 0
    342             # PROFISafe profiles must end with at least 3 bytes of high entropy
    343             if succ_count >= 3: # Possible profisafe CRC
    344                 return (
    345                     start,
    346                     Profisafe,
    347                     {"CRC": succ_count, "length": conf["length"]}
    348                     )
    349         # Not a PROFISafe profile
    350         return (start, klass, conf)
    351 
    352     @staticmethod
    353     def data_entropy(packets, locations=None):
    354         """Analyse a packet list to find the entropy of each data byte
    355 
    356         locations: possible data locations. If not provided, analyse_pn_rt will
    357         be called beforehand. If not given, it calls in the same time
    358         analyse_data which update the configuration of the data field"""
    359         if not locations:
    360             locations = PNIORealTime.find_data(packets)
    361 
    362         # Retrieve the entropy of each data byte, for each communication
    363         entropies = {}
    364         for comm in locations:
    365             if len(locations[comm]) > 0: # Doesn't append empty data
    366                 entropies[comm] = []
    367                 comm_packets = []
    368 
    369                 # fetch all packets from the communication
    370                 for pkt in packets:
    371                     if PNIORealTime in pkt and (pkt.src, pkt.dst) == comm:
    372                         comm_packets.append(
    373                             bytes(pkt[PNIORealTime])[:-4].rstrip(b"\0")
    374                             )
    375 
    376                 # Get the entropy
    377                 for start, dummy, conf in locations[comm]:
    378                     for i in range(start, start + conf["length"]):
    379                         entropies[comm].append(
    380                             (i, entropy_of_byte(comm_packets, i))
    381                             )
    382 
    383         return entropies
    384 
    385     @staticmethod
    386     def draw_entropy(packets, locations=None):
    387         """Plot the entropy of each data byte of PN RT communication"""
    388         import matplotlib.pyplot as plt
    389         import matplotlib.cm as cm
    390         entropies = PNIORealTime.data_entropy(packets, locations)
    391 
    392         rows = len(entropies)
    393         cur_row = 1
    394         for comm in entropies:
    395             index = []
    396             vals = []
    397             for i, ent in entropies[comm]:
    398                 index.append(i)
    399                 vals.append(ent)
    400 
    401             # Offsets the indexes to get the index from the beginning
    402             offset = -min(index)
    403             index = [i + offset for i in index]
    404 
    405             plt.subplot(rows, 1, cur_row)
    406             plt.bar(index, vals, 0.8, color="r")
    407             plt.xticks([i + 0.4 for i in index], index)
    408             plt.title("Entropy from %s to %s" % comm)
    409             cur_row += 1
    410             plt.ylabel("Shannon Entropy")
    411 
    412         plt.xlabel("Byte offset")   # x label only on the last row
    413         plt.legend()
    414 
    415         plt.tight_layout()
    416         plt.show()
    417 
    418 def entropy_of_byte(packets, position):
    419     """Compute the entropy of a byte at a given offset"""
    420     counter = [0 for _ in range(256)]
    421 
    422     # Count each byte a appearance
    423     for pkt in packets:
    424         if -position <= len(pkt):     # position must be a negative index
    425             counter[orb(pkt[position])] += 1
    426 
    427     # Compute the Shannon entropy
    428     entropy = 0
    429     length = len(packets)
    430     for count in counter:
    431         if count > 0:
    432             ratio = float(count) / length
    433             entropy -= ratio * math.log(ratio, 2)
    434 
    435     return entropy
    436 
    437 ###############
    438 ## PROFISafe ##
    439 ###############
    440 
    441 class XVarBytesField(XByteField):
    442     """Variable length bytes field, from 0 to 8 bytes"""
    443     __slots__ = ["length_from"]
    444     def __init__(self, name, default, length=None, length_from=None):
    445         self.length_from = length_from
    446         if length:
    447             self.length_from = lambda p, l=length: l
    448         Field.__init__(self, name, default, "!Q")
    449 
    450     def addfield(self, pkt, s, val):
    451         length = self.length_from(pkt)
    452         return s + struct.pack(self.fmt, self.i2m(pkt, val))[8-length:]
    453 
    454     def getfield(self, pkt, s):
    455         length = self.length_from(pkt)
    456         val = struct.unpack(self.fmt, b"\x00"*(8 - length) + s[:length])[0]
    457         return  s[length:], self.m2i(pkt, val)
    458 
    459 
    460 class Profisafe(PNIORealTimeRawData):
    461     """PROFISafe profil to be encapsulated inside the PNRT.data list.
    462 
    463     It's a configurable packet whose config includes a fix length, and a CRC
    464     length. The config parameter must then be a dict {"length": X, "CRC": Y}.
    465     """
    466     name = "PROFISafe"
    467     fields_desc = [
    468         StrFixedLenField("load", "", length_from=lambda p: p[Profisafe].data_length()),
    469         XByteField("Control_Status", 0),
    470         XVarBytesField("CRC", 0, length_from=lambda p: p[Profisafe].crc_length())
    471         ]
    472     def data_length(self):
    473         """Return the length of the data"""
    474         ret = self.length() - self.crc_length() - 1
    475         return  ret
    476 
    477     def crc_length(self):
    478         """Return the length of the crc"""
    479         return self._config["CRC"]
    480 
    481