1 # coding: utf8 2 3 # This file is part of Scapy 4 # Scapy is free software: you can redistribute it and/or modify 5 # it under the terms of the GNU General Public License as published by 6 # the Free Software Foundation, either version 2 of the License, or 7 # any later version. 8 # 9 # Scapy is distributed in the hope that it will be useful, 10 # but WITHOUT ANY WARRANTY; without even the implied warranty of 11 # MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the 12 # GNU General Public License for more details. 13 # 14 # You should have received a copy of the GNU General Public License 15 # along with Scapy. If not, see <http://www.gnu.org/licenses/>. 16 17 # scapy.contrib.description = ModBus Protocol 18 # scapy.contrib.status = loads 19 20 # Copyright (C) 2017 Arthur Gervais, Ken LE PRADO, Sbastien Mainand, Thomas Aurel 21 22 from scapy.packet import * 23 from scapy.fields import * 24 from scapy.layers.inet import * 25 26 # TODO: implement serial specific function codes 27 28 _modbus_exceptions = {1: "Illegal Function Code", 29 2: "Illegal Data Address", 30 3: "Illegal Data Value", 31 4: "Server Device Failure", 32 5: "Acknowledge", 33 6: "Server Device Busy", 34 8: "Memory Parity Error", 35 10: "Gateway Path Unavailable", 36 11: "Gateway Target Device Failed to Respond"} 37 38 39 class ModbusPDU01ReadCoilsRequest(Packet): 40 name = "Read Coils Request" 41 fields_desc = [XByteField("funcCode", 0x01), 42 XShortField("startAddr", 0x0000), # 0x0000 to 0xFFFF 43 XShortField("quantity", 0x0001)] 44 45 def extract_padding(self, s): 46 return b"", None 47 48 49 class ModbusPDU01ReadCoilsResponse(Packet): 50 name = "Read Coils Response" 51 fields_desc = [XByteField("funcCode", 0x01), 52 BitFieldLenField("byteCount", None, 8, count_of="coilStatus"), 53 FieldListField("coilStatus", [0x00], ByteField("", 0x00), count_from=lambda pkt: pkt.byteCount)] 54 55 def extract_padding(self, s): 56 return b"", None 57 58 59 class ModbusPDU01ReadCoilsError(Packet): 60 name = "Read Coils Exception" 61 fields_desc = [XByteField("funcCode", 0x81), 62 ByteEnumField("exceptCode", 1, _modbus_exceptions)] 63 64 def extract_padding(self, s): 65 return b"", None 66 67 68 class ModbusPDU02ReadDiscreteInputsRequest(Packet): 69 name = "Read Discrete Inputs" 70 fields_desc = [XByteField("funcCode", 0x02), 71 XShortField("startAddr", 0x0000), 72 XShortField("quantity", 0x0001)] 73 74 def extract_padding(self, s): 75 return b"", None 76 77 78 class ModbusPDU02ReadDiscreteInputsResponse(Packet): 79 """ inputStatus: result is represented as bytes, padded with 0 to have a 80 integer number of bytes. The field does not parse this result and 81 present the bytes directly 82 """ 83 name = "Read Discrete Inputs Response" 84 fields_desc = [XByteField("funcCode", 0x02), 85 BitFieldLenField("byteCount", None, 8, count_of="inputStatus"), 86 FieldListField("inputStatus", [0x00], ByteField("", 0x00), count_from=lambda pkt: pkt.byteCount)] 87 88 89 class ModbusPDU02ReadDiscreteInputsError(Packet): 90 name = "Read Discrete Inputs Exception" 91 fields_desc = [XByteField("funcCode", 0x82), 92 ByteEnumField("exceptCode", 1, _modbus_exceptions)] 93 94 95 class ModbusPDU03ReadHoldingRegistersRequest(Packet): 96 name = "Read Holding Registers" 97 fields_desc = [XByteField("funcCode", 0x03), 98 XShortField("startAddr", 0x0000), 99 XShortField("quantity", 0x0001)] 100 101 def extract_padding(self, s): 102 return b"", None 103 104 105 class ModbusPDU03ReadHoldingRegistersResponse(Packet): 106 name = "Read Holding Registers Response" 107 fields_desc = [XByteField("funcCode", 0x03), 108 BitFieldLenField("byteCount", None, 8, count_of="registerVal", adjust=lambda pkt, x: x*2), 109 FieldListField("registerVal", [0x0000], ShortField("", 0x0000), 110 count_from=lambda pkt: pkt.byteCount)] 111 112 113 class ModbusPDU03ReadHoldingRegistersError(Packet): 114 name = "Read Holding Registers Exception" 115 fields_desc = [XByteField("funcCode", 0x83), 116 ByteEnumField("exceptCode", 1, _modbus_exceptions)] 117 118 119 class ModbusPDU04ReadInputRegistersRequest(Packet): 120 name = "Read Input Registers" 121 fields_desc = [XByteField("funcCode", 0x04), 122 XShortField("startAddr", 0x0000), 123 XShortField("quantity", 0x0001)] 124 125 def extract_padding(self, s): 126 return b"", None 127 128 129 class ModbusPDU04ReadInputRegistersResponse(Packet): 130 name = "Read Input Registers Response" 131 fields_desc = [XByteField("funcCode", 0x04), 132 BitFieldLenField("byteCount", None, 8, count_of="registerVal", adjust=lambda pkt, x: x*2), 133 FieldListField("registerVal", [0x0000], ShortField("", 0x0000), 134 count_from=lambda pkt: pkt.byteCount)] 135 136 137 class ModbusPDU04ReadInputRegistersError(Packet): 138 name = "Read Input Registers Exception" 139 fields_desc = [XByteField("funcCode", 0x84), 140 ByteEnumField("exceptCode", 1, _modbus_exceptions)] 141 142 143 class ModbusPDU05WriteSingleCoilRequest(Packet): 144 name = "Write Single Coil" 145 fields_desc = [XByteField("funcCode", 0x05), 146 XShortField("outputAddr", 0x0000), # from 0x0000 to 0xFFFF 147 XShortField("outputValue", 0x0000)] # 0x0000 == Off, 0xFF00 == On 148 149 150 class ModbusPDU05WriteSingleCoilResponse(Packet): # The answer is the same as the request if successful 151 name = "Write Single Coil" 152 fields_desc = [XByteField("funcCode", 0x05), 153 XShortField("outputAddr", 0x0000), # from 0x0000 to 0xFFFF 154 XShortField("outputValue", 0x0000)] # 0x0000 == Off, 0xFF00 == On 155 156 157 class ModbusPDU05WriteSingleCoilError(Packet): 158 name = "Write Single Coil Exception" 159 fields_desc = [XByteField("funcCode", 0x85), 160 ByteEnumField("exceptCode", 1, _modbus_exceptions)] 161 162 163 class ModbusPDU06WriteSingleRegisterRequest(Packet): 164 name = "Write Single Register" 165 fields_desc = [XByteField("funcCode", 0x06), 166 XShortField("registerAddr", 0x0000), 167 XShortField("registerValue", 0x0000)] 168 169 def extract_padding(self, s): 170 return b"", None 171 172 173 class ModbusPDU06WriteSingleRegisterResponse(Packet): 174 name = "Write Single Register Response" 175 fields_desc = [XByteField("funcCode", 0x06), 176 XShortField("registerAddr", 0x0000), 177 XShortField("registerValue", 0x0000)] 178 179 180 class ModbusPDU06WriteSingleRegisterError(Packet): 181 name = "Write Single Register Exception" 182 fields_desc = [XByteField("funcCode", 0x86), 183 ByteEnumField("exceptCode", 1, _modbus_exceptions)] 184 185 186 class ModbusPDU07ReadExceptionStatusRequest(Packet): 187 name = "Read Exception Status" 188 fields_desc = [XByteField("funcCode", 0x07)] 189 190 def extract_padding(self, s): 191 return b"", None 192 193 194 class ModbusPDU07ReadExceptionStatusResponse(Packet): 195 name = "Read Exception Status Response" 196 fields_desc = [XByteField("funcCode", 0x07), 197 XByteField("startingAddr", 0x00)] 198 199 200 class ModbusPDU07ReadExceptionStatusError(Packet): 201 name = "Read Exception Status Exception" 202 fields_desc = [XByteField("funcCode", 0x87), 203 ByteEnumField("exceptCode", 1, _modbus_exceptions)] 204 205 206 class ModbusPDU0FWriteMultipleCoilsRequest(Packet): 207 name = "Write Multiple Coils" 208 fields_desc = [XByteField("funcCode", 0x0F), 209 XShortField("startingAddr", 0x0000), 210 XShortField("quantityOutput", 0x0001), 211 BitFieldLenField("byteCount", None, 8, count_of="outputsValue"), 212 FieldListField("outputsValue", [0x00], XByteField("", 0x00), count_from=lambda pkt: pkt.byteCount)] 213 214 def extract_padding(self, s): 215 return b"", None 216 217 218 class ModbusPDU0FWriteMultipleCoilsResponse(Packet): 219 name = "Write Multiple Coils Response" 220 fields_desc = [XByteField("funcCode", 0x0F), 221 XShortField("startingAddr", 0x0000), 222 XShortField("quantityOutput", 0x0001)] 223 224 225 class ModbusPDU0FWriteMultipleCoilsError(Packet): 226 name = "Write Multiple Coils Exception" 227 fields_desc = [XByteField("funcCode", 0x8F), 228 ByteEnumField("exceptCode", 1, _modbus_exceptions)] 229 230 231 class ModbusPDU10WriteMultipleRegistersRequest(Packet): 232 name = "Write Multiple Registers" 233 fields_desc = [XByteField("funcCode", 0x10), 234 XShortField("startingAddr", 0x0000), 235 BitFieldLenField("quantityRegisters", None, 16, count_of="outputsValue",), 236 BitFieldLenField("byteCount", None, 8, count_of="outputsValue", adjust=lambda pkt, x: x*2), 237 FieldListField("outputsValue", [0x0000], XShortField("", 0x0000), 238 count_from=lambda pkt: pkt.byteCount)] 239 240 241 class ModbusPDU10WriteMultipleRegistersResponse(Packet): 242 name = "Write Multiple Registers Response" 243 fields_desc = [XByteField("funcCode", 0x10), 244 XShortField("startingAddr", 0x0000), 245 XShortField("quantityRegisters", 0x0001)] 246 247 248 class ModbusPDU10WriteMultipleRegistersError(Packet): 249 name = "Write Multiple Registers Exception" 250 fields_desc = [XByteField("funcCode", 0x90), 251 ByteEnumField("exceptCode", 1, _modbus_exceptions)] 252 253 254 class ModbusPDU11ReportSlaveIdRequest(Packet): 255 name = "Report Slave Id" 256 fields_desc = [XByteField("funcCode", 0x11)] 257 258 def extract_padding(self, s): 259 return b"", None 260 261 262 class ModbusPDU11ReportSlaveIdResponse(Packet): 263 name = "Report Slave Id Response" 264 fields_desc = [XByteField("funcCode", 0x11), 265 BitFieldLenField("byteCount", None, 8, length_of="slaveId"), 266 ConditionalField(StrLenField("slaveId", "", length_from=lambda pkt: pkt.byteCount), 267 lambda pkt: pkt.byteCount > 0), 268 ConditionalField(XByteField("runIdicatorStatus", 0x00), lambda pkt: pkt.byteCount > 0)] 269 270 271 class ModbusPDU11ReportSlaveIdError(Packet): 272 name = "Report Slave Id Exception" 273 fields_desc = [XByteField("funcCode", 0x91), 274 ByteEnumField("exceptCode", 1, _modbus_exceptions)] 275 276 277 class ModbusReadFileSubRequest(Packet): 278 name = "Sub-request of Read File Record" 279 fields_desc = [ByteField("refType", 0x06), 280 ShortField("fileNumber", 0x0001), 281 ShortField("recordNumber", 0x0000), 282 ShortField("recordLength", 0x0001)] 283 284 def guess_payload_class(self, payload): 285 return ModbusReadFileSubRequest 286 287 288 class ModbusPDU14ReadFileRecordRequest(Packet): 289 name = "Read File Record" 290 fields_desc = [XByteField("funcCode", 0x14), 291 ByteField("byteCount", None)] 292 293 def guess_payload_class(self, payload): 294 if self.byteCount > 0: 295 return ModbusReadFileSubRequest 296 else: 297 return Packet.guess_payload_class(self, payload) 298 299 def post_build(self, p, pay): 300 if self.byteCount is None: 301 l = len(pay) 302 p = p[:1] + struct.pack("!B", l) + p[3:] 303 return p + pay 304 305 306 class ModbusReadFileSubResponse(Packet): 307 name = "Sub-response" 308 fields_desc = [BitFieldLenField("respLength", None, 8, count_of="recData", adjust=lambda pkt, p: p*2+1), 309 ByteField("refType", 0x06), 310 FieldListField("recData", [0x0000], XShortField("", 0x0000), 311 count_from=lambda pkt: (pkt.respLength-1)//2)] 312 313 def guess_payload_class(self, payload): 314 return ModbusReadFileSubResponse 315 316 317 class ModbusPDU14ReadFileRecordResponse(Packet): 318 name = "Read File Record Response" 319 fields_desc = [XByteField("funcCode", 0x14), 320 ByteField("dataLength", None)] 321 322 def post_build(self, p, pay): 323 if self.dataLength is None: 324 l = len(pay) 325 p = p[:1] + struct.pack("!B", l) + p[3:] 326 return p + pay 327 328 def guess_payload_class(self, payload): 329 if self.dataLength > 0: 330 return ModbusReadFileSubResponse 331 else: 332 return Packet.guess_payload_class(self, payload) 333 334 335 class ModbusPDU14ReadFileRecordError(Packet): 336 name = "Read File Record Exception" 337 fields_desc = [XByteField("funcCode", 0x94), 338 ByteEnumField("exceptCode", 1, _modbus_exceptions)] 339 340 341 # 0x15 : Write File Record 342 class ModbusWriteFileSubRequest(Packet): 343 name = "Sub request of Write File Record" 344 fields_desc = [ByteField("refType", 0x06), 345 ShortField("fileNumber", 0x0001), 346 ShortField("recordNumber", 0x0000), 347 BitFieldLenField("recordLength", None, 16, length_of="recordData", adjust=lambda pkt, p: p//2), 348 FieldListField("recordData", [0x0000], ShortField("", 0x0000), 349 length_from=lambda pkt: pkt.recordLength*2)] 350 351 def guess_payload_class(self, payload): 352 if payload: 353 return ModbusWriteFileSubRequest 354 355 356 class ModbusPDU15WriteFileRecordRequest(Packet): 357 name = "Write File Record" 358 fields_desc = [XByteField("funcCode", 0x15), 359 ByteField("dataLength", None)] 360 361 def post_build(self, p, pay): 362 if self.dataLength is None: 363 l = len(pay) 364 p = p[:1] + struct.pack("!B", l) + p[3:] 365 return p + pay 366 367 def guess_payload_class(self, payload): 368 if self.dataLength > 0: 369 return ModbusWriteFileSubRequest 370 else: 371 return Packet.guess_payload_class(self, payload) 372 373 374 class ModbusWriteFileSubResponse(ModbusWriteFileSubRequest): 375 name = "Sub response of Write File Record" 376 377 def guess_payload_class(self, payload): 378 if payload: 379 return ModbusWriteFileSubResponse 380 381 382 class ModbusPDU15WriteFileRecordResponse(ModbusPDU15WriteFileRecordRequest): 383 name = "Write File Record Response" 384 385 def guess_payload_class(self, payload): 386 if self.dataLength > 0: 387 return ModbusWriteFileSubResponse 388 else: 389 return Packet.guess_payload_class(self, payload) 390 391 392 class ModbusPDU15WriteFileRecordError(Packet): 393 name = "Write File Record Exception" 394 fields_desc = [XByteField("funcCode", 0x95), 395 ByteEnumField("exceptCode", 1, _modbus_exceptions)] 396 397 398 class ModbusPDU16MaskWriteRegisterRequest(Packet): 399 # and/or to 0xFFFF/0x0000 so that nothing is changed in memory 400 name = "Mask Write Register" 401 fields_desc = [XByteField("funcCode", 0x16), 402 XShortField("refAddr", 0x0000), 403 XShortField("andMask", 0xffff), 404 XShortField("orMask", 0x0000)] 405 406 407 class ModbusPDU16MaskWriteRegisterResponse(Packet): 408 name = "Mask Write Register Response" 409 fields_desc = [XByteField("funcCode", 0x16), 410 XShortField("refAddr", 0x0000), 411 XShortField("andMask", 0xffff), 412 XShortField("orMask", 0x0000)] 413 414 415 class ModbusPDU16MaskWriteRegisterError(Packet): 416 name = "Mask Write Register Exception" 417 fields_desc = [XByteField("funcCode", 0x96), 418 ByteEnumField("exceptCode", 1, _modbus_exceptions)] 419 420 421 class ModbusPDU17ReadWriteMultipleRegistersRequest(Packet): 422 name = "Read Write Multiple Registers" 423 fields_desc = [XByteField("funcCode", 0x17), 424 XShortField("readStartingAddr", 0x0000), 425 XShortField("readQuantityRegisters", 0x0001), 426 XShortField("writeStartingAddr", 0x0000), 427 BitFieldLenField("writeQuantityRegisters", None, 16, count_of="writeRegistersValue"), 428 BitFieldLenField("byteCount", None, 8, count_of="writeRegistersValue", adjust=lambda pkt, x: x*2), 429 FieldListField("writeRegistersValue", [0x0000], XShortField("", 0x0000), 430 count_from=lambda pkt: pkt.byteCount)] 431 432 433 class ModbusPDU17ReadWriteMultipleRegistersResponse(Packet): 434 name = "Read Write Multiple Registers Response" 435 fields_desc = [XByteField("funcCode", 0x17), 436 BitFieldLenField("byteCount", None, 8, count_of="registerVal", adjust=lambda pkt, x: x*2), 437 FieldListField("registerVal", [0x0000], ShortField("", 0x0000), 438 count_from=lambda pkt: pkt.byteCount)] 439 440 441 class ModbusPDU17ReadWriteMultipleRegistersError(Packet): 442 name = "Read Write Multiple Exception" 443 fields_desc = [XByteField("funcCode", 0x97), 444 ByteEnumField("exceptCode", 1, _modbus_exceptions)] 445 446 447 class ModbusPDU18ReadFIFOQueueRequest(Packet): 448 name = "Read FIFO Queue" 449 fields_desc = [XByteField("funcCode", 0x18), 450 XShortField("FIFOPointerAddr", 0x0000)] 451 452 453 class ModbusPDU18ReadFIFOQueueResponse(Packet): 454 name = "Read FIFO Queue Response" 455 fields_desc = [XByteField("funcCode", 0x18), 456 # TODO: ByteCount must includes size of FIFOCount 457 BitFieldLenField("byteCount", None, 16, count_of="FIFOVal", adjust=lambda pkt, p: p*2+2), 458 BitFieldLenField("FIFOCount", None, 16, count_of="FIFOVal"), 459 FieldListField("FIFOVal", [], ShortField("", 0x0000), count_from=lambda pkt: pkt.byteCount)] 460 461 462 class ModbusPDU18ReadFIFOQueueError(Packet): 463 name = "Read FIFO Queue Exception" 464 fields_desc = [XByteField("funcCode", 0x98), 465 ByteEnumField("exceptCode", 1, _modbus_exceptions)] 466 467 468 # TODO: not implemented, out of the main specification 469 # class ModbusPDU2B0DCANOpenGeneralReferenceRequest(Packet): 470 # name = "CANopen General Reference Request" 471 # fields_desc = [] 472 # 473 # 474 # class ModbusPDU2B0DCANOpenGeneralReferenceResponse(Packet): 475 # name = "CANopen General Reference Response" 476 # fields_desc = [] 477 # 478 # 479 # class ModbusPDU2B0DCANOpenGeneralReferenceError(Packet): 480 # name = "CANopen General Reference Error" 481 # fields_desc = [] 482 483 484 # 0x2B/0x0E - Read Device Identification values 485 _read_device_id_codes = {1: "Basic", 486 2: "Regular", 487 3: "Extended", 488 4: "Specific"} 489 # 0x00->0x02: mandatory 490 # 0x03->0x06: optional 491 # 0x07->0x7F: Reserved (optional) 492 # 0x80->0xFF: product dependent private objects (optional) 493 _read_device_id_object_id = {0x00: "VendorName", 494 0x01: "ProductCode", 495 0x02: "MajorMinorRevision", 496 0x03: "VendorUrl", 497 0x04: "ProductName", 498 0x05: "ModelName", 499 0x06: "UserApplicationName"} 500 _read_device_id_conformity_lvl = {0x01: "Basic Identification (stream only)", 501 0x02: "Regular Identification (stream only)", 502 0x03: "Extended Identification (stream only)", 503 0x81: "Basic Identification (stream and individual access)", 504 0x82: "Regular Identification (stream and individual access)", 505 0x83: "Extended Identification (stream and individual access)"} 506 _read_device_id_more_follow = {0x00: "No", 507 0x01: "Yes"} 508 509 510 class ModbusPDU2B0EReadDeviceIdentificationRequest(Packet): 511 name = "Read Device Identification" 512 fields_desc = [XByteField("funcCode", 0x2B), 513 XByteField("MEIType", 0x0E), 514 ByteEnumField("readCode", 1, _read_device_id_codes), 515 ByteEnumField("objectId", 0x00, _read_device_id_object_id)] 516 517 518 class ModbusPDU2B0EReadDeviceIdentificationResponse(Packet): 519 name = "Read Device Identification" 520 fields_desc = [XByteField("funcCode", 0x2B), 521 XByteField("MEIType", 0x0E), 522 ByteEnumField("readCode", 4, _read_device_id_codes), 523 ByteEnumField("conformityLevel", 0x01, _read_device_id_conformity_lvl), 524 ByteEnumField("more", 0x00, _read_device_id_more_follow), 525 ByteEnumField("nextObjId", 0x00, _read_device_id_object_id), 526 ByteField("objCount", 0x00)] 527 528 def guess_payload_class(self, payload): 529 if self.objCount > 0: 530 return ModbusObjectId 531 else: 532 return Packet.guess_payload_class(self, payload) 533 534 535 class ModbusPDU2B0EReadDeviceIdentificationError(Packet): 536 name = "Read Exception Status Exception" 537 fields_desc = [XByteField("funcCode", 0xAB), 538 ByteEnumField("exceptCode", 1, _modbus_exceptions)] 539 540 541 _reserved_funccode_request = { 542 0x08: '0x08 Unknown Reserved Request', 543 0x09: '0x09 Unknown Reserved Request', 544 0x0A: '0x0a Unknown Reserved Request', 545 0x0D: '0x0d Unknown Reserved Request', 546 0x0E: '0x0e Unknown Reserved Request', 547 0x29: '0x29 Unknown Reserved Request', 548 0x2A: '0x2a Unknown Reserved Request', 549 0x5A: 'Specific Schneider Electric Request', 550 0x5B: '0x5b Unknown Reserved Request', 551 0x7D: '0x7d Unknown Reserved Request', 552 0x7E: '0x7e Unknown Reserved Request', 553 0x7F: '0x7f Unknown Reserved Request', 554 } 555 556 _reserved_funccode_response = { 557 0x08: '0x08 Unknown Reserved Response', 558 0x09: '0x09 Unknown Reserved Response', 559 0x0A: '0x0a Unknown Reserved Response', 560 0x0D: '0x0d Unknown Reserved Response', 561 0x0E: '0x0e Unknown Reserved Response', 562 0x29: '0x29 Unknown Reserved Response', 563 0x2A: '0x2a Unknown Reserved Response', 564 0x5A: 'Specific Schneider Electric Response', 565 0x5B: '0x5b Unknown Reserved Response', 566 0x7D: '0x7d Unknown Reserved Response', 567 0x7E: '0x7e Unknown Reserved Response', 568 0x7F: '0x7f Unknown Reserved Response', 569 } 570 571 _reserved_funccode_error = { 572 0x88: '0x88 Unknown Reserved Error', 573 0x89: '0x89 Unknown Reserved Error', 574 0x8A: '0x8a Unknown Reserved Error', 575 0x8D: '0x8d Unknown Reserved Error', 576 0x8E: '0x8e Unknown Reserved Error', 577 0xA9: '0x88 Unknown Reserved Error', 578 0xAA: '0x88 Unknown Reserved Error', 579 0xDA: 'Specific Schneider Electric Error', 580 0xDB: '0xdb Unknown Reserved Error', 581 0xDC: '0xdc Unknown Reserved Error', 582 0xFD: '0xfd Unknown Reserved Error', 583 0xFE: '0xfe Unknown Reserved Error', 584 0xFF: '0xff Unknown Reserved Error', 585 } 586 587 588 class ModbusPDUReservedFunctionCodeRequest(Packet): 589 name = "Reserved Function Code Request" 590 fields_desc = [ 591 ByteEnumField("funcCode", 0x00, _reserved_funccode_request), 592 StrFixedLenField('payload', '', 255), ] 593 594 def extract_padding(self, s): 595 return b"", None 596 597 def mysummary(self): 598 return self.sprintf("Modbus Reserved Request %funcCode%") 599 600 601 class ModbusPDUReservedFunctionCodeResponse(Packet): 602 name = "Reserved Function Code Response" 603 fields_desc = [ 604 ByteEnumField("funcCode", 0x00, _reserved_funccode_response), 605 StrFixedLenField('payload', '', 255), ] 606 607 def extract_padding(self, s): 608 return b"", None 609 610 def mysummary(self): 611 return self.sprintf("Modbus Reserved Response %funcCode%") 612 613 614 class ModbusPDUReservedFunctionCodeError(Packet): 615 name = "Reserved Function Code Error" 616 fields_desc = [ 617 ByteEnumField("funcCode", 0x00, _reserved_funccode_error), 618 StrFixedLenField('payload', '', 255), ] 619 620 def extract_padding(self, s): 621 return b"", None 622 623 def mysummary(self): 624 return self.sprintf("Modbus Reserved Error %funcCode%") 625 626 627 _userdefined_funccode_request = { 628 } 629 _userdefined_funccode_response = { 630 } 631 _userdefined_funccode_error = { 632 } 633 634 635 class ModbusByteEnumField(EnumField): 636 __slots__ = "defEnum" 637 638 def __init__(self, name, default, enum, defEnum): 639 EnumField.__init__(self, name, default, enum, "B") 640 defEnum = self.defEnum = defEnum 641 642 def i2repr_one(self, pkt, x): 643 if self not in conf.noenum and not isinstance(x, VolatileValue) \ 644 and x in self.i2s: 645 return self.i2s[x] 646 if self.defEnum: 647 return self.defEnum 648 return repr(x) 649 650 651 class ModbusPDUUserDefinedFunctionCodeRequest(Packet): 652 name = "User-Defined Function Code Request" 653 fields_desc = [ 654 ModbusByteEnumField( 655 "funcCode", 0x00, _userdefined_funccode_request, 656 "Unknown user-defined request function Code"), 657 StrFixedLenField('payload', '', 255), ] 658 659 def extract_padding(self, s): 660 return b"", None 661 662 def mysummary(self): 663 return self.sprintf("Modbus User-Defined Request %funcCode%") 664 665 666 class ModbusPDUUserDefinedFunctionCodeResponse(Packet): 667 name = "User-Defined Function Code Response" 668 fields_desc = [ 669 ModbusByteEnumField( 670 "funcCode", 0x00, _userdefined_funccode_response, 671 "Unknown user-defined response function Code"), 672 StrFixedLenField('payload', '', 255), ] 673 674 def extract_padding(self, s): 675 return b"", None 676 677 def mysummary(self): 678 return self.sprintf("Modbus User-Defined Response %funcCode%") 679 680 681 class ModbusPDUUserDefinedFunctionCodeError(Packet): 682 name = "User-Defined Function Code Error" 683 fields_desc = [ 684 ModbusByteEnumField( 685 "funcCode", 0x00, _userdefined_funccode_error, 686 "Unknown user-defined error function Code"), 687 StrFixedLenField('payload', '', 255), ] 688 689 def extract_padding(self, s): 690 return b"", None 691 692 def mysummary(self): 693 return self.sprintf("Modbus User-Defined Error %funcCode%") 694 695 696 class ModbusObjectId(Packet): 697 name = "Object" 698 fields_desc = [ByteEnumField("id", 0x00, _read_device_id_object_id), 699 BitFieldLenField("length", None, 8, length_of="value"), 700 StrLenField("value", "", length_from=lambda pkt: pkt.length)] 701 702 def guess_payload_class(self, payload): 703 return ModbusObjectId 704 705 706 _modbus_request_classes = { 707 0x01: ModbusPDU01ReadCoilsRequest, 708 0x02: ModbusPDU02ReadDiscreteInputsRequest, 709 0x03: ModbusPDU03ReadHoldingRegistersRequest, 710 0x04: ModbusPDU04ReadInputRegistersRequest, 711 0x05: ModbusPDU05WriteSingleCoilRequest, 712 0x06: ModbusPDU06WriteSingleRegisterRequest, 713 0x07: ModbusPDU07ReadExceptionStatusRequest, 714 0x0F: ModbusPDU0FWriteMultipleCoilsRequest, 715 0x10: ModbusPDU10WriteMultipleRegistersRequest, 716 0x11: ModbusPDU11ReportSlaveIdRequest, 717 0x14: ModbusPDU14ReadFileRecordRequest, 718 0x15: ModbusPDU15WriteFileRecordRequest, 719 0x16: ModbusPDU16MaskWriteRegisterRequest, 720 0x17: ModbusPDU17ReadWriteMultipleRegistersRequest, 721 0x18: ModbusPDU18ReadFIFOQueueRequest, 722 } 723 _modbus_error_classes = { 724 0x81: ModbusPDU01ReadCoilsError, 725 0x82: ModbusPDU02ReadDiscreteInputsError, 726 0x83: ModbusPDU03ReadHoldingRegistersError, 727 0x84: ModbusPDU04ReadInputRegistersError, 728 0x85: ModbusPDU05WriteSingleCoilError, 729 0x86: ModbusPDU06WriteSingleRegisterError, 730 0x87: ModbusPDU07ReadExceptionStatusError, 731 0x8F: ModbusPDU0FWriteMultipleCoilsError, 732 0x90: ModbusPDU10WriteMultipleRegistersError, 733 0x91: ModbusPDU11ReportSlaveIdError, 734 0x94: ModbusPDU14ReadFileRecordError, 735 0x95: ModbusPDU15WriteFileRecordError, 736 0x96: ModbusPDU16MaskWriteRegisterError, 737 0x97: ModbusPDU17ReadWriteMultipleRegistersError, 738 0x98: ModbusPDU18ReadFIFOQueueError, 739 0xAB: ModbusPDU2B0EReadDeviceIdentificationError, 740 } 741 _modbus_response_classes = { 742 0x01: ModbusPDU01ReadCoilsResponse, 743 0x02: ModbusPDU02ReadDiscreteInputsResponse, 744 0x03: ModbusPDU03ReadHoldingRegistersResponse, 745 0x04: ModbusPDU04ReadInputRegistersResponse, 746 0x05: ModbusPDU05WriteSingleCoilResponse, 747 0x06: ModbusPDU06WriteSingleRegisterResponse, 748 0x07: ModbusPDU07ReadExceptionStatusResponse, 749 0x0F: ModbusPDU0FWriteMultipleCoilsResponse, 750 0x10: ModbusPDU10WriteMultipleRegistersResponse, 751 0x11: ModbusPDU11ReportSlaveIdResponse, 752 0x14: ModbusPDU14ReadFileRecordResponse, 753 0x15: ModbusPDU15WriteFileRecordResponse, 754 0x16: ModbusPDU16MaskWriteRegisterResponse, 755 0x17: ModbusPDU17ReadWriteMultipleRegistersResponse, 756 0x18: ModbusPDU18ReadFIFOQueueResponse, 757 } 758 _mei_types_request = { 759 0x0E: ModbusPDU2B0EReadDeviceIdentificationRequest, 760 # 0x0D: ModbusPDU2B0DCANOpenGeneralReferenceRequest, 761 } 762 _mei_types_response = { 763 0x0E: ModbusPDU2B0EReadDeviceIdentificationResponse, 764 # 0x0D: ModbusPDU2B0DCANOpenGeneralReferenceResponse, 765 } 766 767 768 class ModbusADURequest(Packet): 769 name = "ModbusADU" 770 fields_desc = [XShortField("transId", 0x0000), # needs to be unique 771 XShortField("protoId", 0x0000), # needs to be zero (Modbus) 772 ShortField("len", None), # is calculated with payload 773 XByteField("unitId", 0xff)] # 0xFF (recommended as non-significant value) or 0x00 774 775 def guess_payload_class(self, payload): 776 function_code = orb(payload[0]) 777 778 if function_code == 0x2B: 779 sub_code = orb(payload[1]) 780 try: 781 return _mei_types_request[sub_code] 782 except KeyError: 783 pass 784 try: 785 return _modbus_request_classes[function_code] 786 except KeyError: 787 pass 788 if function_code in _reserved_funccode_request: 789 return ModbusPDUReservedFunctionCodeRequest 790 return ModbusPDUUserDefinedFunctionCodeRequest 791 792 def post_build(self, p, pay): 793 if self.len is None: 794 l = len(pay) + 1 # +len(p) 795 p = p[:4] + struct.pack("!H", l) + p[6:] 796 return p + pay 797 798 799 class ModbusADUResponse(Packet): 800 name = "ModbusADU" 801 fields_desc = [XShortField("transId", 0x0000), # needs to be unique 802 XShortField("protoId", 0x0000), # needs to be zero (Modbus) 803 ShortField("len", None), # is calculated with payload 804 XByteField("unitId", 0xff)] # 0xFF or 0x00 should be used for Modbus over TCP/IP 805 806 def guess_payload_class(self, payload): 807 function_code = orb(payload[0]) 808 809 if function_code == 0x2B: 810 sub_code = orb(payload[1]) 811 try: 812 return _mei_types_response[sub_code] 813 except KeyError: 814 pass 815 try: 816 return _modbus_response_classes[function_code] 817 except KeyError: 818 pass 819 try: 820 return _modbus_error_classes[function_code] 821 except KeyError: 822 pass 823 if function_code in _reserved_funccode_response: 824 return ModbusPDUReservedFunctionCodeResponse 825 elif function_code in _reserved_funccode_error: 826 return ModbusPDUReservedFunctionCodeError 827 if function_code < 0x80: 828 return ModbusPDUUserDefinedFunctionCodeResponse 829 return ModbusPDUUserDefinedFunctionCodeError 830 831 def post_build(self, p, pay): 832 if self.len is None: 833 l = len(pay) + 1 # +len(p) 834 p = p[:4] + struct.pack("!H", l) + p[6:] 835 return p + pay 836 837 838 bind_layers(TCP, ModbusADURequest, dport=502) 839 bind_layers(TCP, ModbusADUResponse, sport=502) 840