Home | History | Annotate | Download | only in contrib
      1 # coding: utf8
      2 
      3 # This file is part of Scapy
      4 # Scapy is free software: you can redistribute it and/or modify
      5 # it under the terms of the GNU General Public License as published by
      6 # the Free Software Foundation, either version 2 of the License, or
      7 # any later version.
      8 #
      9 # Scapy is distributed in the hope that it will be useful,
     10 # but WITHOUT ANY WARRANTY; without even the implied warranty of
     11 # MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
     12 # GNU General Public License for more details.
     13 #
     14 # You should have received a copy of the GNU General Public License
     15 # along with Scapy. If not, see <http://www.gnu.org/licenses/>.
     16 
     17 # scapy.contrib.description = ModBus Protocol
     18 # scapy.contrib.status = loads
     19 
     20 # Copyright (C) 2017 Arthur Gervais, Ken LE PRADO, Sbastien Mainand, Thomas Aurel
     21 
     22 from scapy.packet import *
     23 from scapy.fields import *
     24 from scapy.layers.inet import *
     25 
     26 # TODO: implement serial specific function codes
     27 
     28 _modbus_exceptions = {1: "Illegal Function Code",
     29                       2: "Illegal Data Address",
     30                       3: "Illegal Data Value",
     31                       4: "Server Device Failure",
     32                       5: "Acknowledge",
     33                       6: "Server Device Busy",
     34                       8: "Memory Parity Error",
     35                       10: "Gateway Path Unavailable",
     36                       11: "Gateway Target Device Failed to Respond"}
     37 
     38 
     39 class ModbusPDU01ReadCoilsRequest(Packet):
     40     name = "Read Coils Request"
     41     fields_desc = [XByteField("funcCode", 0x01),
     42                    XShortField("startAddr", 0x0000),  # 0x0000 to 0xFFFF
     43                    XShortField("quantity", 0x0001)]
     44 
     45     def extract_padding(self, s):
     46         return b"", None
     47 
     48 
     49 class ModbusPDU01ReadCoilsResponse(Packet):
     50     name = "Read Coils Response"
     51     fields_desc = [XByteField("funcCode", 0x01),
     52                    BitFieldLenField("byteCount", None, 8, count_of="coilStatus"),
     53                    FieldListField("coilStatus", [0x00], ByteField("", 0x00), count_from=lambda pkt: pkt.byteCount)]
     54 
     55     def extract_padding(self, s):
     56         return b"", None
     57 
     58 
     59 class ModbusPDU01ReadCoilsError(Packet):
     60     name = "Read Coils Exception"
     61     fields_desc = [XByteField("funcCode", 0x81),
     62                    ByteEnumField("exceptCode", 1, _modbus_exceptions)]
     63 
     64     def extract_padding(self, s):
     65         return b"", None
     66 
     67 
     68 class ModbusPDU02ReadDiscreteInputsRequest(Packet):
     69     name = "Read Discrete Inputs"
     70     fields_desc = [XByteField("funcCode", 0x02),
     71                    XShortField("startAddr", 0x0000),
     72                    XShortField("quantity", 0x0001)]
     73 
     74     def extract_padding(self, s):
     75         return b"", None
     76 
     77 
     78 class ModbusPDU02ReadDiscreteInputsResponse(Packet):
     79     """ inputStatus: result is represented as bytes, padded with 0 to have a
     80         integer number of bytes. The field does not parse this result and
     81         present the bytes directly
     82     """
     83     name = "Read Discrete Inputs Response"
     84     fields_desc = [XByteField("funcCode", 0x02),
     85                    BitFieldLenField("byteCount", None, 8, count_of="inputStatus"),
     86                    FieldListField("inputStatus", [0x00], ByteField("", 0x00), count_from=lambda pkt: pkt.byteCount)]
     87 
     88 
     89 class ModbusPDU02ReadDiscreteInputsError(Packet):
     90     name = "Read Discrete Inputs Exception"
     91     fields_desc = [XByteField("funcCode", 0x82),
     92                    ByteEnumField("exceptCode", 1, _modbus_exceptions)]
     93 
     94 
     95 class ModbusPDU03ReadHoldingRegistersRequest(Packet):
     96     name = "Read Holding Registers"
     97     fields_desc = [XByteField("funcCode", 0x03),
     98                    XShortField("startAddr", 0x0000),
     99                    XShortField("quantity", 0x0001)]
    100 
    101     def extract_padding(self, s):
    102         return b"", None
    103 
    104 
    105 class ModbusPDU03ReadHoldingRegistersResponse(Packet):
    106     name = "Read Holding Registers Response"
    107     fields_desc = [XByteField("funcCode", 0x03),
    108                    BitFieldLenField("byteCount", None, 8, count_of="registerVal", adjust=lambda pkt, x: x*2),
    109                    FieldListField("registerVal", [0x0000], ShortField("", 0x0000),
    110                                   count_from=lambda pkt: pkt.byteCount)]
    111 
    112 
    113 class ModbusPDU03ReadHoldingRegistersError(Packet):
    114     name = "Read Holding Registers Exception"
    115     fields_desc = [XByteField("funcCode", 0x83),
    116                    ByteEnumField("exceptCode", 1, _modbus_exceptions)]
    117 
    118 
    119 class ModbusPDU04ReadInputRegistersRequest(Packet):
    120     name = "Read Input Registers"
    121     fields_desc = [XByteField("funcCode", 0x04),
    122                    XShortField("startAddr", 0x0000),
    123                    XShortField("quantity", 0x0001)]
    124 
    125     def extract_padding(self, s):
    126         return b"", None
    127 
    128 
    129 class ModbusPDU04ReadInputRegistersResponse(Packet):
    130     name = "Read Input Registers Response"
    131     fields_desc = [XByteField("funcCode", 0x04),
    132                    BitFieldLenField("byteCount", None, 8, count_of="registerVal", adjust=lambda pkt, x: x*2),
    133                    FieldListField("registerVal", [0x0000], ShortField("", 0x0000),
    134                                   count_from=lambda pkt: pkt.byteCount)]
    135 
    136 
    137 class ModbusPDU04ReadInputRegistersError(Packet):
    138     name = "Read Input Registers Exception"
    139     fields_desc = [XByteField("funcCode", 0x84),
    140                    ByteEnumField("exceptCode", 1, _modbus_exceptions)]
    141 
    142 
    143 class ModbusPDU05WriteSingleCoilRequest(Packet):
    144     name = "Write Single Coil"
    145     fields_desc = [XByteField("funcCode", 0x05),
    146                    XShortField("outputAddr", 0x0000),  # from 0x0000 to 0xFFFF
    147                    XShortField("outputValue", 0x0000)]  # 0x0000 == Off, 0xFF00 == On
    148 
    149 
    150 class ModbusPDU05WriteSingleCoilResponse(Packet):  # The answer is the same as the request if successful
    151     name = "Write Single Coil"
    152     fields_desc = [XByteField("funcCode", 0x05),
    153                    XShortField("outputAddr", 0x0000),  # from 0x0000 to 0xFFFF
    154                    XShortField("outputValue", 0x0000)]  # 0x0000 == Off, 0xFF00 == On
    155 
    156 
    157 class ModbusPDU05WriteSingleCoilError(Packet):
    158     name = "Write Single Coil Exception"
    159     fields_desc = [XByteField("funcCode", 0x85),
    160                    ByteEnumField("exceptCode", 1, _modbus_exceptions)]
    161 
    162 
    163 class ModbusPDU06WriteSingleRegisterRequest(Packet):
    164     name = "Write Single Register"
    165     fields_desc = [XByteField("funcCode", 0x06),
    166                    XShortField("registerAddr", 0x0000),
    167                    XShortField("registerValue", 0x0000)]
    168 
    169     def extract_padding(self, s):
    170         return b"", None
    171 
    172 
    173 class ModbusPDU06WriteSingleRegisterResponse(Packet):
    174     name = "Write Single Register Response"
    175     fields_desc = [XByteField("funcCode", 0x06),
    176                    XShortField("registerAddr", 0x0000),
    177                    XShortField("registerValue", 0x0000)]
    178 
    179 
    180 class ModbusPDU06WriteSingleRegisterError(Packet):
    181     name = "Write Single Register Exception"
    182     fields_desc = [XByteField("funcCode", 0x86),
    183                    ByteEnumField("exceptCode", 1, _modbus_exceptions)]
    184 
    185 
    186 class ModbusPDU07ReadExceptionStatusRequest(Packet):
    187     name = "Read Exception Status"
    188     fields_desc = [XByteField("funcCode", 0x07)]
    189 
    190     def extract_padding(self, s):
    191         return b"", None
    192 
    193 
    194 class ModbusPDU07ReadExceptionStatusResponse(Packet):
    195     name = "Read Exception Status Response"
    196     fields_desc = [XByteField("funcCode", 0x07),
    197                    XByteField("startingAddr", 0x00)]
    198 
    199 
    200 class ModbusPDU07ReadExceptionStatusError(Packet):
    201     name = "Read Exception Status Exception"
    202     fields_desc = [XByteField("funcCode", 0x87),
    203                    ByteEnumField("exceptCode", 1, _modbus_exceptions)]
    204 
    205 
    206 class ModbusPDU0FWriteMultipleCoilsRequest(Packet):
    207     name = "Write Multiple Coils"
    208     fields_desc = [XByteField("funcCode", 0x0F),
    209                    XShortField("startingAddr", 0x0000),
    210                    XShortField("quantityOutput", 0x0001),
    211                    BitFieldLenField("byteCount", None, 8, count_of="outputsValue"),
    212                    FieldListField("outputsValue", [0x00], XByteField("", 0x00), count_from=lambda pkt: pkt.byteCount)]
    213 
    214     def extract_padding(self, s):
    215         return b"", None
    216 
    217 
    218 class ModbusPDU0FWriteMultipleCoilsResponse(Packet):
    219     name = "Write Multiple Coils Response"
    220     fields_desc = [XByteField("funcCode", 0x0F),
    221                    XShortField("startingAddr", 0x0000),
    222                    XShortField("quantityOutput", 0x0001)]
    223 
    224 
    225 class ModbusPDU0FWriteMultipleCoilsError(Packet):
    226     name = "Write Multiple Coils Exception"
    227     fields_desc = [XByteField("funcCode", 0x8F),
    228                    ByteEnumField("exceptCode", 1, _modbus_exceptions)]
    229 
    230 
    231 class ModbusPDU10WriteMultipleRegistersRequest(Packet):
    232     name = "Write Multiple Registers"
    233     fields_desc = [XByteField("funcCode", 0x10),
    234                    XShortField("startingAddr", 0x0000),
    235                    BitFieldLenField("quantityRegisters", None, 16, count_of="outputsValue",),
    236                    BitFieldLenField("byteCount", None, 8, count_of="outputsValue", adjust=lambda pkt, x: x*2),
    237                    FieldListField("outputsValue", [0x0000], XShortField("", 0x0000),
    238                                   count_from=lambda pkt: pkt.byteCount)]
    239 
    240 
    241 class ModbusPDU10WriteMultipleRegistersResponse(Packet):
    242     name = "Write Multiple Registers Response"
    243     fields_desc = [XByteField("funcCode", 0x10),
    244                    XShortField("startingAddr", 0x0000),
    245                    XShortField("quantityRegisters", 0x0001)]
    246 
    247 
    248 class ModbusPDU10WriteMultipleRegistersError(Packet):
    249     name = "Write Multiple Registers Exception"
    250     fields_desc = [XByteField("funcCode", 0x90),
    251                    ByteEnumField("exceptCode", 1, _modbus_exceptions)]
    252 
    253 
    254 class ModbusPDU11ReportSlaveIdRequest(Packet):
    255     name = "Report Slave Id"
    256     fields_desc = [XByteField("funcCode", 0x11)]
    257 
    258     def extract_padding(self, s):
    259         return b"", None
    260 
    261 
    262 class ModbusPDU11ReportSlaveIdResponse(Packet):
    263     name = "Report Slave Id Response"
    264     fields_desc = [XByteField("funcCode", 0x11),
    265                    BitFieldLenField("byteCount", None, 8, length_of="slaveId"),
    266                    ConditionalField(StrLenField("slaveId", "", length_from=lambda pkt: pkt.byteCount),
    267                                     lambda pkt: pkt.byteCount > 0),
    268                    ConditionalField(XByteField("runIdicatorStatus", 0x00), lambda pkt: pkt.byteCount > 0)]
    269 
    270 
    271 class ModbusPDU11ReportSlaveIdError(Packet):
    272     name = "Report Slave Id Exception"
    273     fields_desc = [XByteField("funcCode", 0x91),
    274                    ByteEnumField("exceptCode", 1, _modbus_exceptions)]
    275 
    276 
    277 class ModbusReadFileSubRequest(Packet):
    278     name = "Sub-request of Read File Record"
    279     fields_desc = [ByteField("refType", 0x06),
    280                    ShortField("fileNumber", 0x0001),
    281                    ShortField("recordNumber", 0x0000),
    282                    ShortField("recordLength", 0x0001)]
    283 
    284     def guess_payload_class(self, payload):
    285         return ModbusReadFileSubRequest
    286 
    287 
    288 class ModbusPDU14ReadFileRecordRequest(Packet):
    289     name = "Read File Record"
    290     fields_desc = [XByteField("funcCode", 0x14),
    291                    ByteField("byteCount", None)]
    292 
    293     def guess_payload_class(self, payload):
    294         if self.byteCount > 0:
    295             return ModbusReadFileSubRequest
    296         else:
    297             return Packet.guess_payload_class(self, payload)
    298 
    299     def post_build(self, p, pay):
    300         if self.byteCount is None:
    301             l = len(pay)
    302             p = p[:1] + struct.pack("!B", l) + p[3:]
    303         return p + pay
    304 
    305 
    306 class ModbusReadFileSubResponse(Packet):
    307     name = "Sub-response"
    308     fields_desc = [BitFieldLenField("respLength", None, 8, count_of="recData", adjust=lambda pkt, p: p*2+1),
    309                    ByteField("refType", 0x06),
    310                    FieldListField("recData", [0x0000], XShortField("", 0x0000),
    311                                   count_from=lambda pkt: (pkt.respLength-1)//2)]
    312 
    313     def guess_payload_class(self, payload):
    314         return ModbusReadFileSubResponse
    315 
    316 
    317 class ModbusPDU14ReadFileRecordResponse(Packet):
    318     name = "Read File Record Response"
    319     fields_desc = [XByteField("funcCode", 0x14),
    320                    ByteField("dataLength", None)]
    321 
    322     def post_build(self, p, pay):
    323         if self.dataLength is None:
    324             l = len(pay)
    325             p = p[:1] + struct.pack("!B", l) + p[3:]
    326         return p + pay
    327 
    328     def guess_payload_class(self, payload):
    329         if self.dataLength > 0:
    330             return ModbusReadFileSubResponse
    331         else:
    332             return Packet.guess_payload_class(self, payload)
    333 
    334 
    335 class ModbusPDU14ReadFileRecordError(Packet):
    336     name = "Read File Record Exception"
    337     fields_desc = [XByteField("funcCode", 0x94),
    338                    ByteEnumField("exceptCode", 1, _modbus_exceptions)]
    339 
    340 
    341 # 0x15 : Write File Record
    342 class ModbusWriteFileSubRequest(Packet):
    343     name = "Sub request of Write File Record"
    344     fields_desc = [ByteField("refType", 0x06),
    345                    ShortField("fileNumber", 0x0001),
    346                    ShortField("recordNumber", 0x0000),
    347                    BitFieldLenField("recordLength", None, 16, length_of="recordData", adjust=lambda pkt, p: p//2),
    348                    FieldListField("recordData", [0x0000], ShortField("", 0x0000),
    349                                   length_from=lambda pkt: pkt.recordLength*2)]
    350 
    351     def guess_payload_class(self, payload):
    352         if payload:
    353             return ModbusWriteFileSubRequest
    354 
    355 
    356 class ModbusPDU15WriteFileRecordRequest(Packet):
    357     name = "Write File Record"
    358     fields_desc = [XByteField("funcCode", 0x15),
    359                    ByteField("dataLength", None)]
    360 
    361     def post_build(self, p, pay):
    362         if self.dataLength is None:
    363             l = len(pay)
    364             p = p[:1] + struct.pack("!B", l) + p[3:]
    365             return p + pay
    366 
    367     def guess_payload_class(self, payload):
    368         if self.dataLength > 0:
    369             return ModbusWriteFileSubRequest
    370         else:
    371             return Packet.guess_payload_class(self, payload)
    372 
    373 
    374 class ModbusWriteFileSubResponse(ModbusWriteFileSubRequest):
    375     name = "Sub response of Write File Record"
    376 
    377     def guess_payload_class(self, payload):
    378         if payload:
    379             return ModbusWriteFileSubResponse
    380 
    381 
    382 class ModbusPDU15WriteFileRecordResponse(ModbusPDU15WriteFileRecordRequest):
    383     name = "Write File Record Response"
    384 
    385     def guess_payload_class(self, payload):
    386         if self.dataLength > 0:
    387             return ModbusWriteFileSubResponse
    388         else:
    389             return Packet.guess_payload_class(self, payload)
    390 
    391 
    392 class ModbusPDU15WriteFileRecordError(Packet):
    393     name = "Write File Record Exception"
    394     fields_desc = [XByteField("funcCode", 0x95),
    395                    ByteEnumField("exceptCode", 1, _modbus_exceptions)]
    396 
    397 
    398 class ModbusPDU16MaskWriteRegisterRequest(Packet):
    399     # and/or to 0xFFFF/0x0000 so that nothing is changed in memory
    400     name = "Mask Write Register"
    401     fields_desc = [XByteField("funcCode", 0x16),
    402                    XShortField("refAddr", 0x0000),
    403                    XShortField("andMask", 0xffff),
    404                    XShortField("orMask", 0x0000)]
    405 
    406 
    407 class ModbusPDU16MaskWriteRegisterResponse(Packet):
    408     name = "Mask Write Register Response"
    409     fields_desc = [XByteField("funcCode", 0x16),
    410                    XShortField("refAddr", 0x0000),
    411                    XShortField("andMask", 0xffff),
    412                    XShortField("orMask", 0x0000)]
    413 
    414 
    415 class ModbusPDU16MaskWriteRegisterError(Packet):
    416     name = "Mask Write Register Exception"
    417     fields_desc = [XByteField("funcCode", 0x96),
    418                    ByteEnumField("exceptCode", 1, _modbus_exceptions)]
    419 
    420 
    421 class ModbusPDU17ReadWriteMultipleRegistersRequest(Packet):
    422     name = "Read Write Multiple Registers"
    423     fields_desc = [XByteField("funcCode", 0x17),
    424                    XShortField("readStartingAddr", 0x0000),
    425                    XShortField("readQuantityRegisters", 0x0001),
    426                    XShortField("writeStartingAddr", 0x0000),
    427                    BitFieldLenField("writeQuantityRegisters", None, 16, count_of="writeRegistersValue"),
    428                    BitFieldLenField("byteCount", None, 8, count_of="writeRegistersValue", adjust=lambda pkt, x: x*2),
    429                    FieldListField("writeRegistersValue", [0x0000], XShortField("", 0x0000),
    430                                   count_from=lambda pkt: pkt.byteCount)]
    431 
    432 
    433 class ModbusPDU17ReadWriteMultipleRegistersResponse(Packet):
    434     name = "Read Write Multiple Registers Response"
    435     fields_desc = [XByteField("funcCode", 0x17),
    436                    BitFieldLenField("byteCount", None, 8, count_of="registerVal", adjust=lambda pkt, x: x*2),
    437                    FieldListField("registerVal", [0x0000], ShortField("", 0x0000),
    438                                   count_from=lambda pkt: pkt.byteCount)]
    439 
    440 
    441 class ModbusPDU17ReadWriteMultipleRegistersError(Packet):
    442     name = "Read Write Multiple Exception"
    443     fields_desc = [XByteField("funcCode", 0x97),
    444                    ByteEnumField("exceptCode", 1, _modbus_exceptions)]
    445 
    446 
    447 class ModbusPDU18ReadFIFOQueueRequest(Packet):
    448     name = "Read FIFO Queue"
    449     fields_desc = [XByteField("funcCode", 0x18),
    450                    XShortField("FIFOPointerAddr", 0x0000)]
    451 
    452 
    453 class ModbusPDU18ReadFIFOQueueResponse(Packet):
    454     name = "Read FIFO Queue Response"
    455     fields_desc = [XByteField("funcCode", 0x18),
    456                    # TODO: ByteCount must includes size of FIFOCount
    457                    BitFieldLenField("byteCount", None, 16, count_of="FIFOVal", adjust=lambda pkt, p: p*2+2),
    458                    BitFieldLenField("FIFOCount", None, 16, count_of="FIFOVal"),
    459                    FieldListField("FIFOVal", [], ShortField("", 0x0000), count_from=lambda pkt: pkt.byteCount)]
    460 
    461 
    462 class ModbusPDU18ReadFIFOQueueError(Packet):
    463     name = "Read FIFO Queue Exception"
    464     fields_desc = [XByteField("funcCode", 0x98),
    465                    ByteEnumField("exceptCode", 1, _modbus_exceptions)]
    466 
    467 
    468 # TODO: not implemented, out of the main specification
    469 # class ModbusPDU2B0DCANOpenGeneralReferenceRequest(Packet):
    470 #     name = "CANopen General Reference Request"
    471 #     fields_desc = []
    472 #
    473 #
    474 # class ModbusPDU2B0DCANOpenGeneralReferenceResponse(Packet):
    475 #     name = "CANopen General Reference Response"
    476 #     fields_desc = []
    477 #
    478 #
    479 # class ModbusPDU2B0DCANOpenGeneralReferenceError(Packet):
    480 #     name = "CANopen General Reference Error"
    481 #     fields_desc = []
    482 
    483 
    484 # 0x2B/0x0E - Read Device Identification values
    485 _read_device_id_codes = {1: "Basic",
    486                          2: "Regular",
    487                          3: "Extended",
    488                          4: "Specific"}
    489 # 0x00->0x02: mandatory
    490 # 0x03->0x06: optional
    491 # 0x07->0x7F: Reserved (optional)
    492 # 0x80->0xFF: product dependent private objects (optional)
    493 _read_device_id_object_id = {0x00: "VendorName",
    494                              0x01: "ProductCode",
    495                              0x02: "MajorMinorRevision",
    496                              0x03: "VendorUrl",
    497                              0x04: "ProductName",
    498                              0x05: "ModelName",
    499                              0x06: "UserApplicationName"}
    500 _read_device_id_conformity_lvl = {0x01: "Basic Identification (stream only)",
    501                                   0x02: "Regular Identification (stream only)",
    502                                   0x03: "Extended Identification (stream only)",
    503                                   0x81: "Basic Identification (stream and individual access)",
    504                                   0x82: "Regular Identification (stream and individual access)",
    505                                   0x83: "Extended Identification (stream and individual access)"}
    506 _read_device_id_more_follow = {0x00: "No",
    507                                0x01: "Yes"}
    508 
    509 
    510 class ModbusPDU2B0EReadDeviceIdentificationRequest(Packet):
    511     name = "Read Device Identification"
    512     fields_desc = [XByteField("funcCode", 0x2B),
    513                    XByteField("MEIType", 0x0E),
    514                    ByteEnumField("readCode", 1, _read_device_id_codes),
    515                    ByteEnumField("objectId", 0x00, _read_device_id_object_id)]
    516 
    517 
    518 class ModbusPDU2B0EReadDeviceIdentificationResponse(Packet):
    519     name = "Read Device Identification"
    520     fields_desc = [XByteField("funcCode", 0x2B),
    521                    XByteField("MEIType", 0x0E),
    522                    ByteEnumField("readCode", 4, _read_device_id_codes),
    523                    ByteEnumField("conformityLevel", 0x01, _read_device_id_conformity_lvl),
    524                    ByteEnumField("more", 0x00, _read_device_id_more_follow),
    525                    ByteEnumField("nextObjId", 0x00, _read_device_id_object_id),
    526                    ByteField("objCount", 0x00)]
    527 
    528     def guess_payload_class(self, payload):
    529         if self.objCount > 0:
    530             return ModbusObjectId
    531         else:
    532             return Packet.guess_payload_class(self, payload)
    533 
    534 
    535 class ModbusPDU2B0EReadDeviceIdentificationError(Packet):
    536     name = "Read Exception Status Exception"
    537     fields_desc = [XByteField("funcCode", 0xAB),
    538                    ByteEnumField("exceptCode", 1, _modbus_exceptions)]
    539 
    540 
    541 _reserved_funccode_request = {
    542     0x08: '0x08 Unknown Reserved Request',
    543     0x09: '0x09 Unknown Reserved Request',
    544     0x0A: '0x0a Unknown Reserved Request',
    545     0x0D: '0x0d Unknown Reserved Request',
    546     0x0E: '0x0e Unknown Reserved Request',
    547     0x29: '0x29 Unknown Reserved Request',
    548     0x2A: '0x2a Unknown Reserved Request',
    549     0x5A: 'Specific Schneider Electric Request',
    550     0x5B: '0x5b Unknown Reserved Request',
    551     0x7D: '0x7d Unknown Reserved Request',
    552     0x7E: '0x7e Unknown Reserved Request',
    553     0x7F: '0x7f Unknown Reserved Request',
    554 }
    555 
    556 _reserved_funccode_response = {
    557     0x08: '0x08 Unknown Reserved Response',
    558     0x09: '0x09 Unknown Reserved Response',
    559     0x0A: '0x0a Unknown Reserved Response',
    560     0x0D: '0x0d Unknown Reserved Response',
    561     0x0E: '0x0e Unknown Reserved Response',
    562     0x29: '0x29 Unknown Reserved Response',
    563     0x2A: '0x2a Unknown Reserved Response',
    564     0x5A: 'Specific Schneider Electric Response',
    565     0x5B: '0x5b Unknown Reserved Response',
    566     0x7D: '0x7d Unknown Reserved Response',
    567     0x7E: '0x7e Unknown Reserved Response',
    568     0x7F: '0x7f Unknown Reserved Response',
    569 }
    570 
    571 _reserved_funccode_error = {
    572     0x88: '0x88 Unknown Reserved Error',
    573     0x89: '0x89 Unknown Reserved Error',
    574     0x8A: '0x8a Unknown Reserved Error',
    575     0x8D: '0x8d Unknown Reserved Error',
    576     0x8E: '0x8e Unknown Reserved Error',
    577     0xA9: '0x88 Unknown Reserved Error',
    578     0xAA: '0x88 Unknown Reserved Error',
    579     0xDA: 'Specific Schneider Electric Error',
    580     0xDB: '0xdb Unknown Reserved Error',
    581     0xDC: '0xdc Unknown Reserved Error',
    582     0xFD: '0xfd Unknown Reserved Error',
    583     0xFE: '0xfe Unknown Reserved Error',
    584     0xFF: '0xff Unknown Reserved Error',
    585 }
    586 
    587 
    588 class ModbusPDUReservedFunctionCodeRequest(Packet):
    589     name = "Reserved Function Code Request"
    590     fields_desc = [
    591             ByteEnumField("funcCode", 0x00, _reserved_funccode_request),
    592             StrFixedLenField('payload', '', 255), ]
    593 
    594     def extract_padding(self, s):
    595         return b"", None
    596 
    597     def mysummary(self):
    598         return self.sprintf("Modbus Reserved Request %funcCode%")
    599 
    600 
    601 class ModbusPDUReservedFunctionCodeResponse(Packet):
    602     name = "Reserved Function Code Response"
    603     fields_desc = [
    604             ByteEnumField("funcCode", 0x00, _reserved_funccode_response),
    605             StrFixedLenField('payload', '', 255), ]
    606 
    607     def extract_padding(self, s):
    608         return b"", None
    609 
    610     def mysummary(self):
    611         return self.sprintf("Modbus Reserved Response %funcCode%")
    612 
    613 
    614 class ModbusPDUReservedFunctionCodeError(Packet):
    615     name = "Reserved Function Code Error"
    616     fields_desc = [
    617             ByteEnumField("funcCode", 0x00, _reserved_funccode_error),
    618             StrFixedLenField('payload', '', 255), ]
    619 
    620     def extract_padding(self, s):
    621         return b"", None
    622 
    623     def mysummary(self):
    624         return self.sprintf("Modbus Reserved Error %funcCode%")
    625 
    626 
    627 _userdefined_funccode_request = {
    628 }
    629 _userdefined_funccode_response = {
    630 }
    631 _userdefined_funccode_error = {
    632 }
    633 
    634 
    635 class ModbusByteEnumField(EnumField):
    636     __slots__ = "defEnum"
    637 
    638     def __init__(self, name, default, enum, defEnum):
    639         EnumField.__init__(self, name, default, enum, "B")
    640         defEnum = self.defEnum = defEnum
    641 
    642     def i2repr_one(self, pkt, x):
    643         if self not in conf.noenum and not isinstance(x, VolatileValue) \
    644                     and x in self.i2s:
    645             return self.i2s[x]
    646         if self.defEnum:
    647             return self.defEnum
    648         return repr(x)
    649 
    650 
    651 class ModbusPDUUserDefinedFunctionCodeRequest(Packet):
    652     name = "User-Defined Function Code Request"
    653     fields_desc = [
    654             ModbusByteEnumField(
    655                 "funcCode", 0x00, _userdefined_funccode_request,
    656                 "Unknown user-defined request function Code"),
    657             StrFixedLenField('payload', '', 255), ]
    658 
    659     def extract_padding(self, s):
    660         return b"", None
    661 
    662     def mysummary(self):
    663         return self.sprintf("Modbus User-Defined Request %funcCode%")
    664 
    665 
    666 class ModbusPDUUserDefinedFunctionCodeResponse(Packet):
    667     name = "User-Defined Function Code Response"
    668     fields_desc = [
    669             ModbusByteEnumField(
    670                 "funcCode", 0x00, _userdefined_funccode_response,
    671                 "Unknown user-defined response function Code"),
    672             StrFixedLenField('payload', '', 255), ]
    673 
    674     def extract_padding(self, s):
    675         return b"", None
    676 
    677     def mysummary(self):
    678         return self.sprintf("Modbus User-Defined Response %funcCode%")
    679 
    680 
    681 class ModbusPDUUserDefinedFunctionCodeError(Packet):
    682     name = "User-Defined Function Code Error"
    683     fields_desc = [
    684             ModbusByteEnumField(
    685                 "funcCode", 0x00, _userdefined_funccode_error,
    686                 "Unknown user-defined error function Code"),
    687             StrFixedLenField('payload', '', 255), ]
    688 
    689     def extract_padding(self, s):
    690         return b"", None
    691 
    692     def mysummary(self):
    693         return self.sprintf("Modbus User-Defined Error %funcCode%")
    694 
    695 
    696 class ModbusObjectId(Packet):
    697     name = "Object"
    698     fields_desc = [ByteEnumField("id", 0x00, _read_device_id_object_id),
    699                    BitFieldLenField("length", None, 8, length_of="value"),
    700                    StrLenField("value", "", length_from=lambda pkt: pkt.length)]
    701 
    702     def guess_payload_class(self, payload):
    703         return ModbusObjectId
    704 
    705 
    706 _modbus_request_classes = {
    707     0x01: ModbusPDU01ReadCoilsRequest,
    708     0x02: ModbusPDU02ReadDiscreteInputsRequest,
    709     0x03: ModbusPDU03ReadHoldingRegistersRequest,
    710     0x04: ModbusPDU04ReadInputRegistersRequest,
    711     0x05: ModbusPDU05WriteSingleCoilRequest,
    712     0x06: ModbusPDU06WriteSingleRegisterRequest,
    713     0x07: ModbusPDU07ReadExceptionStatusRequest,
    714     0x0F: ModbusPDU0FWriteMultipleCoilsRequest,
    715     0x10: ModbusPDU10WriteMultipleRegistersRequest,
    716     0x11: ModbusPDU11ReportSlaveIdRequest,
    717     0x14: ModbusPDU14ReadFileRecordRequest,
    718     0x15: ModbusPDU15WriteFileRecordRequest,
    719     0x16: ModbusPDU16MaskWriteRegisterRequest,
    720     0x17: ModbusPDU17ReadWriteMultipleRegistersRequest,
    721     0x18: ModbusPDU18ReadFIFOQueueRequest,
    722 }
    723 _modbus_error_classes = {
    724     0x81: ModbusPDU01ReadCoilsError,
    725     0x82: ModbusPDU02ReadDiscreteInputsError,
    726     0x83: ModbusPDU03ReadHoldingRegistersError,
    727     0x84: ModbusPDU04ReadInputRegistersError,
    728     0x85: ModbusPDU05WriteSingleCoilError,
    729     0x86: ModbusPDU06WriteSingleRegisterError,
    730     0x87: ModbusPDU07ReadExceptionStatusError,
    731     0x8F: ModbusPDU0FWriteMultipleCoilsError,
    732     0x90: ModbusPDU10WriteMultipleRegistersError,
    733     0x91: ModbusPDU11ReportSlaveIdError,
    734     0x94: ModbusPDU14ReadFileRecordError,
    735     0x95: ModbusPDU15WriteFileRecordError,
    736     0x96: ModbusPDU16MaskWriteRegisterError,
    737     0x97: ModbusPDU17ReadWriteMultipleRegistersError,
    738     0x98: ModbusPDU18ReadFIFOQueueError,
    739     0xAB: ModbusPDU2B0EReadDeviceIdentificationError,
    740 }
    741 _modbus_response_classes = {
    742     0x01: ModbusPDU01ReadCoilsResponse,
    743     0x02: ModbusPDU02ReadDiscreteInputsResponse,
    744     0x03: ModbusPDU03ReadHoldingRegistersResponse,
    745     0x04: ModbusPDU04ReadInputRegistersResponse,
    746     0x05: ModbusPDU05WriteSingleCoilResponse,
    747     0x06: ModbusPDU06WriteSingleRegisterResponse,
    748     0x07: ModbusPDU07ReadExceptionStatusResponse,
    749     0x0F: ModbusPDU0FWriteMultipleCoilsResponse,
    750     0x10: ModbusPDU10WriteMultipleRegistersResponse,
    751     0x11: ModbusPDU11ReportSlaveIdResponse,
    752     0x14: ModbusPDU14ReadFileRecordResponse,
    753     0x15: ModbusPDU15WriteFileRecordResponse,
    754     0x16: ModbusPDU16MaskWriteRegisterResponse,
    755     0x17: ModbusPDU17ReadWriteMultipleRegistersResponse,
    756     0x18: ModbusPDU18ReadFIFOQueueResponse,
    757 }
    758 _mei_types_request = {
    759     0x0E: ModbusPDU2B0EReadDeviceIdentificationRequest,
    760     # 0x0D: ModbusPDU2B0DCANOpenGeneralReferenceRequest,
    761 }
    762 _mei_types_response = {
    763     0x0E: ModbusPDU2B0EReadDeviceIdentificationResponse,
    764     # 0x0D: ModbusPDU2B0DCANOpenGeneralReferenceResponse,
    765 }
    766 
    767 
    768 class ModbusADURequest(Packet):
    769     name = "ModbusADU"
    770     fields_desc = [XShortField("transId", 0x0000),  # needs to be unique
    771                    XShortField("protoId", 0x0000),  # needs to be zero (Modbus)
    772                    ShortField("len", None),  # is calculated with payload
    773                    XByteField("unitId", 0xff)]  # 0xFF (recommended as non-significant value) or 0x00
    774 
    775     def guess_payload_class(self, payload):
    776         function_code = orb(payload[0])
    777 
    778         if function_code == 0x2B:
    779             sub_code = orb(payload[1])
    780             try:
    781                 return _mei_types_request[sub_code]
    782             except KeyError:
    783                 pass
    784         try:
    785             return _modbus_request_classes[function_code]
    786         except KeyError:
    787             pass
    788         if function_code in _reserved_funccode_request:
    789             return ModbusPDUReservedFunctionCodeRequest
    790         return ModbusPDUUserDefinedFunctionCodeRequest
    791 
    792     def post_build(self, p, pay):
    793         if self.len is None:
    794             l = len(pay) + 1  # +len(p)
    795             p = p[:4] + struct.pack("!H", l) + p[6:]
    796         return p + pay
    797 
    798 
    799 class ModbusADUResponse(Packet):
    800     name = "ModbusADU"
    801     fields_desc = [XShortField("transId", 0x0000),  # needs to be unique
    802                    XShortField("protoId", 0x0000),  # needs to be zero (Modbus)
    803                    ShortField("len", None),  # is calculated with payload
    804                    XByteField("unitId", 0xff)]  # 0xFF or 0x00 should be used for Modbus over TCP/IP
    805 
    806     def guess_payload_class(self, payload):
    807         function_code = orb(payload[0])
    808 
    809         if function_code == 0x2B:
    810             sub_code = orb(payload[1])
    811             try:
    812                 return _mei_types_response[sub_code]
    813             except KeyError:
    814                 pass
    815         try:
    816             return _modbus_response_classes[function_code]
    817         except KeyError:
    818             pass
    819         try:
    820             return _modbus_error_classes[function_code]
    821         except KeyError:
    822             pass
    823         if function_code in _reserved_funccode_response:
    824             return ModbusPDUReservedFunctionCodeResponse
    825         elif function_code in _reserved_funccode_error:
    826             return ModbusPDUReservedFunctionCodeError
    827         if function_code < 0x80:
    828             return ModbusPDUUserDefinedFunctionCodeResponse
    829         return ModbusPDUUserDefinedFunctionCodeError
    830 
    831     def post_build(self, p, pay):
    832         if self.len is None:
    833             l = len(pay) + 1  # +len(p)
    834             p = p[:4] + struct.pack("!H", l) + p[6:]
    835         return p + pay
    836 
    837 
    838 bind_layers(TCP, ModbusADURequest, dport=502)
    839 bind_layers(TCP, ModbusADUResponse, sport=502)
    840