1 # This file is part of Scapy 2 # Scapy is free software: you can redistribute it and/or modify 3 # it under the terms of the GNU General Public License as published by 4 # the Free Software Foundation, either version 2 of the License, or 5 # any later version. 6 # 7 # Scapy is distributed in the hope that it will be useful, 8 # but WITHOUT ANY WARRANTY; without even the implied warranty of 9 # MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the 10 # GNU General Public License for more details. 11 # 12 # You should have received a copy of the GNU General Public License 13 # along with Scapy. If not, see <http://www.gnu.org/licenses/>. 14 15 # Copyright (C) 2016 Gauthier Sebaux 16 17 # scapy.contrib.description = ProfinetIO Real-Time Cyclic (RTC) 18 # scapy.contrib.status = loads 19 20 """ 21 PROFINET IO layers for scapy which correspond to Real-Time Cyclic data 22 """ 23 24 # external imports 25 from __future__ import absolute_import 26 import math 27 import struct 28 29 # Scapy imports 30 from scapy.all import Packet, bind_layers, Ether, UDP, Field, conf 31 from scapy.fields import BitEnumField, BitField, ByteField,\ 32 FlagsField,\ 33 PacketListField,\ 34 ShortField, StrFixedLenField,\ 35 XBitField, XByteField 36 37 # local imports 38 from scapy.contrib.pnio import ProfinetIO 39 from scapy.compat import orb 40 from scapy.modules.six.moves import range 41 42 43 ##################################### 44 ## PROFINET Real-Time Data Packets ## 45 ##################################### 46 47 class PNIORealTimeIOxS(Packet): 48 """IOCS and IOPS packets for PROFINET Real-Time payload""" 49 name = "PNIO RTC IOxS" 50 fields_desc = [ 51 BitEnumField("dataState", 1, 1, ["bad", "good"]), 52 BitEnumField("instance", 0, 2, ["subslot", "slot", "device", "controller"]), 53 XBitField("reserved", 0, 4), 54 BitField("extension", 0, 1), 55 ] 56 57 def extract_padding(self, s): 58 return None, s # No extra payload 59 60 61 class PNIORealTimeRawData(Packet): 62 """Raw data packets for PROFINET Real-Time payload. 63 64 It's a configurable packet whose config only includes a fix length. The 65 config parameter must then be a dict {"length": X}. 66 67 PROFINET IO specification impose this packet to be followed with an IOPS 68 (PNIORealTimeIOxS)""" 69 __slots__ = ["_config"] 70 name = "PNIO RTC Raw data" 71 fields_desc = [ 72 StrFixedLenField("load", "", length_from=lambda p: p[PNIORealTimeRawData].length()), 73 ] 74 75 def __init__(self, _pkt="", post_transform=None, _internal=0, _underlayer=None, config=None, **fields): 76 """ 77 length=None means that the length must be managed by the user. If it's 78 defined, the field will always be length-long (padded with b"\\x00" if 79 needed) 80 """ 81 self._config = config 82 Packet.__init__(self, _pkt=_pkt, post_transform=post_transform, 83 _internal=_internal, _underlayer=_underlayer, **fields) 84 85 def copy(self): 86 pkt = Packet.copy(self) 87 pkt._config = self._config 88 return pkt 89 90 def clone_with(self, *args, **kargs): 91 pkt = Packet.clone_with(self, *args, **kargs) 92 pkt._config = self._config 93 return pkt 94 95 def length(self): 96 """Get the length of the raw data""" 97 # Manage the length of the packet if a length is provided 98 return self._config["length"] 99 100 # Make sure an IOPS follows a data 101 bind_layers(PNIORealTimeRawData, PNIORealTimeIOxS) 102 103 104 105 ############################### 106 ## PROFINET Real-Time Fields ## 107 ############################### 108 109 class LowerLayerBoundPacketListField(PacketListField): 110 """PacketList which binds each underlayer of packets to the current pkt""" 111 def m2i(self, pkt, m): 112 return self.cls(m, _underlayer=pkt) 113 114 class NotionalLenField(Field): 115 """A len fields which isn't present in the machine representation, but is 116 computed from a given lambda""" 117 __slots__ = ["length_from", "count_from"] 118 def __init__(self, name, default, length_from=None, count_from=None): 119 Field.__init__(self, name, default) 120 self.length_from = length_from 121 self.count_from = count_from 122 123 def addfield(self, pkt, s, val): 124 return s # Isn't present in the machine packet 125 126 def getfield(self, pkt, s): 127 val = None 128 if self.length_from is not None: 129 val = self.length_from(pkt, s) 130 elif self.count_from is not None: 131 val = self.count_from(pkt, s) 132 return s, val 133 134 135 ############################### 136 ## PNIORealTime Configuration # 137 ############################### 138 139 # conf.contribs["PNIO_RTC"] is a dict which contains data layout for each Ethernet 140 # communications. It must be formatted as such: 141 # {(Ether.src, Ether.dst): [(start, type, config), ...]} 142 # start: index of a data field from the END of the data buffer (-1, -2, ...) 143 # type: class to be instanciated to represent these data 144 # config: a config dict, given to the type class constructor 145 conf.contribs["PNIO_RTC"] = {} 146 147 def _get_ethernet(pkt): 148 """Find the Ethernet packet of underlayer or None""" 149 ether = pkt 150 while ether is not None and not isinstance(ether, Ether): 151 ether = ether.underlayer 152 return ether 153 154 def pnio_update_config(config): 155 """Update the PNIO RTC config""" 156 conf.contribs["PNIO_RTC"].update(config) 157 158 def pnio_get_config(pkt): 159 """Retrieve the config for a given communication""" 160 # get the config based on the tuple (Ether.src, Ether.dst) 161 ether = _get_ethernet(pkt) 162 config = None 163 if ether is not None and (ether.src, ether.dst) in conf.contribs["PNIO_RTC"]: 164 config = conf.contribs["PNIO_RTC"][(ether.src, ether.dst)] 165 166 return config 167 168 169 ############################### 170 ## PROFINET Real-Time Packet ## 171 ############################### 172 173 def _pnio_rtc_guess_payload_class(_pkt, _underlayer=None, *args, **kargs): 174 """A dispatcher for the packet list field which manage the configuration 175 to fin dthe appropriate class""" 176 config = pnio_get_config(_underlayer) 177 178 if isinstance(config, list): 179 # If we have a valid config, it's a list which describe each data 180 # packets the rest being IOCS 181 cur_index = -len(_pkt) 182 for index, cls, params in config: 183 if cur_index == index: 184 return cls(_pkt, config=params, *args, **kargs) 185 186 # Not a data => IOCS packet 187 return PNIORealTimeIOxS(_pkt, *args, **kargs) 188 else: 189 # No config => Raw data which dissect the whole _pkt 190 return PNIORealTimeRawData(_pkt, 191 config={"length": len(_pkt)}, 192 *args, **kargs 193 ) 194 195 196 _PNIO_DS_FLAGS = [ 197 "primary", 198 "redundancy", 199 "validData", 200 "reserved_1", 201 "run", 202 "no_problem", 203 "reserved_2", 204 "ignore", 205 ] 206 class PNIORealTime(Packet): 207 """PROFINET cyclic real-time""" 208 name = "PROFINET Real-Time" 209 fields_desc = [ 210 NotionalLenField("len", None, length_from=lambda p, s: len(s)), 211 NotionalLenField("dataLen", None, length_from=lambda p, s: len(s[:-4].rstrip(b"\0"))), 212 LowerLayerBoundPacketListField("data", [], _pnio_rtc_guess_payload_class, length_from=lambda p: p.dataLen), 213 StrFixedLenField("padding", "", length_from=lambda p: p[PNIORealTime].padding_length()), 214 ShortField("cycleCounter", 0), 215 FlagsField("dataStatus", 0x35, 8, _PNIO_DS_FLAGS), 216 ByteField("transferStatus", 0) 217 ] 218 overload_fields = { 219 ProfinetIO: {"frameID": 0x8000}, # RT_CLASS_1 220 } 221 222 def padding_length(self): 223 """Compute the length of the padding need for the ethernet frame""" 224 fld, val = self.getfield_and_val("data") 225 226 # use the len field if available to define the padding length, eg for 227 # dissected packets 228 pkt_len = self.getfieldval("len") 229 if pkt_len is not None: 230 return max(0, pkt_len - len(fld.addfield(self, b"", val)) - 4) 231 232 if isinstance(self.underlayer, ProfinetIO) and \ 233 isinstance(self.underlayer.underlayer, UDP): 234 return max(0, 12 - len(fld.addfield(self, b"", val))) 235 else: 236 return max(0, 40 - len(fld.addfield(self, b"", val))) 237 238 @staticmethod 239 def analyse_data(packets): 240 """Analyse the data to find heuristical properties and determine 241 location and type of data""" 242 loc = PNIORealTime.find_data(packets) 243 loc = PNIORealTime.analyse_profisafe(packets, loc) 244 pnio_update_config(loc) 245 return loc 246 247 @staticmethod 248 def find_data(packets): 249 """Analyse a packet list to extract data offsets from packets data.""" 250 # a dictionary to count data offsets (ie != 0x80) 251 # It's formatted: {(src, dst): (total, [count for offset in len])} 252 heuristic = {} 253 254 # Counts possible data locations 255 # 0x80 are mainly IOxS and trailling 0x00s are just padding 256 for pkt in packets: 257 if PNIORealTime in pkt: 258 pdu = bytes(pkt[PNIORealTime])[:-4].rstrip(b"\0") 259 260 if (pkt.src, pkt.dst) not in heuristic: 261 heuristic[(pkt.src, pkt.dst)] = (0, []) 262 263 total, counts = heuristic[(pkt.src, pkt.dst)] 264 265 if len(counts) < len(pdu): 266 counts.extend([0 for _ in range(len(pdu) - len(counts))]) 267 268 for i in range(len(pdu)): 269 if orb(pdu[i]) != 0x80: 270 counts[i] += 1 271 272 comm = (pkt.src, pkt.dst) 273 heuristic[comm] = (total + 1, counts) 274 275 # Determine data locations 276 locations = {} 277 for comm in heuristic: 278 total, counts = heuristic[comm] 279 length = len(counts) 280 loc = locations[comm] = [] 281 start = None 282 for i in range(length): 283 if counts[i] > total // 2: # Data if more than half is != 0x80 284 if start is None: 285 start = i 286 else: 287 if start is not None: 288 loc.append(( 289 start - length, 290 PNIORealTimeRawData, 291 {"length": i - start} 292 )) 293 start = None 294 295 return locations 296 297 @staticmethod 298 def analyse_profisafe(packets, locations=None): 299 """Analyse a packet list to find possible PROFISafe profils. 300 301 It's based on an heuristical analysis of each payload to try to find 302 CRC and control/status byte. 303 304 locations: possible data locations. If not provided, analyse_pn_rt will 305 be called beforehand. If not given, it calls in the same time 306 analyse_data which update the configuration of the data field""" 307 # get data locations and entropy of bytes 308 if not locations: 309 locations = PNIORealTime.find_data(packets) 310 entropies = PNIORealTime.data_entropy(packets, locations) 311 312 # Try to find at least 3 high entropy successive bytes (the CRC) 313 for comm in entropies: 314 entropy = dict(entropies[comm]) # Convert tuples to key => value 315 316 for i in range(len(locations[comm])): 317 # update each location with its value after profisafe analysis 318 locations[comm][i] = \ 319 PNIORealTime.analyse_one_profisafe_location( 320 locations[comm][i], entropy 321 ) 322 323 return locations 324 325 @staticmethod 326 def analyse_one_profisafe_location(location, entropy): 327 """Analyse one PNIO RTC data location to find if its a PROFISafe 328 329 :param location: location to analyse, a tuple (start, type, config) 330 :param entropy: the entropy of each byte of the packet data 331 :returns: the configuration associated with the data 332 """ 333 start, klass, conf = location 334 if conf["length"] >= 4: # Minimal PROFISafe length 335 succ_count = 0 336 for j in range(start, start + conf["length"]): 337 # Limit for a CRC is set to 6 bit of entropy min 338 if j in entropy and entropy[j] >= 6: 339 succ_count += 1 340 else: 341 succ_count = 0 342 # PROFISafe profiles must end with at least 3 bytes of high entropy 343 if succ_count >= 3: # Possible profisafe CRC 344 return ( 345 start, 346 Profisafe, 347 {"CRC": succ_count, "length": conf["length"]} 348 ) 349 # Not a PROFISafe profile 350 return (start, klass, conf) 351 352 @staticmethod 353 def data_entropy(packets, locations=None): 354 """Analyse a packet list to find the entropy of each data byte 355 356 locations: possible data locations. If not provided, analyse_pn_rt will 357 be called beforehand. If not given, it calls in the same time 358 analyse_data which update the configuration of the data field""" 359 if not locations: 360 locations = PNIORealTime.find_data(packets) 361 362 # Retrieve the entropy of each data byte, for each communication 363 entropies = {} 364 for comm in locations: 365 if len(locations[comm]) > 0: # Doesn't append empty data 366 entropies[comm] = [] 367 comm_packets = [] 368 369 # fetch all packets from the communication 370 for pkt in packets: 371 if PNIORealTime in pkt and (pkt.src, pkt.dst) == comm: 372 comm_packets.append( 373 bytes(pkt[PNIORealTime])[:-4].rstrip(b"\0") 374 ) 375 376 # Get the entropy 377 for start, dummy, conf in locations[comm]: 378 for i in range(start, start + conf["length"]): 379 entropies[comm].append( 380 (i, entropy_of_byte(comm_packets, i)) 381 ) 382 383 return entropies 384 385 @staticmethod 386 def draw_entropy(packets, locations=None): 387 """Plot the entropy of each data byte of PN RT communication""" 388 import matplotlib.pyplot as plt 389 import matplotlib.cm as cm 390 entropies = PNIORealTime.data_entropy(packets, locations) 391 392 rows = len(entropies) 393 cur_row = 1 394 for comm in entropies: 395 index = [] 396 vals = [] 397 for i, ent in entropies[comm]: 398 index.append(i) 399 vals.append(ent) 400 401 # Offsets the indexes to get the index from the beginning 402 offset = -min(index) 403 index = [i + offset for i in index] 404 405 plt.subplot(rows, 1, cur_row) 406 plt.bar(index, vals, 0.8, color="r") 407 plt.xticks([i + 0.4 for i in index], index) 408 plt.title("Entropy from %s to %s" % comm) 409 cur_row += 1 410 plt.ylabel("Shannon Entropy") 411 412 plt.xlabel("Byte offset") # x label only on the last row 413 plt.legend() 414 415 plt.tight_layout() 416 plt.show() 417 418 def entropy_of_byte(packets, position): 419 """Compute the entropy of a byte at a given offset""" 420 counter = [0 for _ in range(256)] 421 422 # Count each byte a appearance 423 for pkt in packets: 424 if -position <= len(pkt): # position must be a negative index 425 counter[orb(pkt[position])] += 1 426 427 # Compute the Shannon entropy 428 entropy = 0 429 length = len(packets) 430 for count in counter: 431 if count > 0: 432 ratio = float(count) / length 433 entropy -= ratio * math.log(ratio, 2) 434 435 return entropy 436 437 ############### 438 ## PROFISafe ## 439 ############### 440 441 class XVarBytesField(XByteField): 442 """Variable length bytes field, from 0 to 8 bytes""" 443 __slots__ = ["length_from"] 444 def __init__(self, name, default, length=None, length_from=None): 445 self.length_from = length_from 446 if length: 447 self.length_from = lambda p, l=length: l 448 Field.__init__(self, name, default, "!Q") 449 450 def addfield(self, pkt, s, val): 451 length = self.length_from(pkt) 452 return s + struct.pack(self.fmt, self.i2m(pkt, val))[8-length:] 453 454 def getfield(self, pkt, s): 455 length = self.length_from(pkt) 456 val = struct.unpack(self.fmt, b"\x00"*(8 - length) + s[:length])[0] 457 return s[length:], self.m2i(pkt, val) 458 459 460 class Profisafe(PNIORealTimeRawData): 461 """PROFISafe profil to be encapsulated inside the PNRT.data list. 462 463 It's a configurable packet whose config includes a fix length, and a CRC 464 length. The config parameter must then be a dict {"length": X, "CRC": Y}. 465 """ 466 name = "PROFISafe" 467 fields_desc = [ 468 StrFixedLenField("load", "", length_from=lambda p: p[Profisafe].data_length()), 469 XByteField("Control_Status", 0), 470 XVarBytesField("CRC", 0, length_from=lambda p: p[Profisafe].crc_length()) 471 ] 472 def data_length(self): 473 """Return the length of the data""" 474 ret = self.length() - self.crc_length() - 1 475 return ret 476 477 def crc_length(self): 478 """Return the length of the crc""" 479 return self._config["CRC"] 480 481