Parsing Text-Based Protocols
Text-based communication protocols are often used in UART communications in embedded applications for command and control interfaces. The protocols used by many devices do not follow the OSI model of communication protocols, they rather combine the functions of many layers, from the data link layer to the application layer, into one protocol definition.
Unfortunately, many of the protocols that I have seen are poorly thought-out, implement weak checksums or lack proper delimitation.
Features of Text-Based (Data Link Layer) Protocols
Synchronization
Synchronization ensures that the beginning of a frame can be identified. It is particularly important to be able to detect the beginning of frames if
- frames are truncated (loss of power),
- frame length is not known in advance, or
- noise is interpreted as data, for example during initialization or reset.
A specific byte sequence can be used as frame start indication to synchronize on. If the byte sequence can occur in the frame's content, it can be escaped (as in HDLC) or a checksum can be used to verify correct reception. If the checksum does not match, the frame is discarded and has to be resent.
Header, Length Fields
Depending on the application, the frame content may be subdivided into a header and a message body. The header typically contains information that is necessary to receive and process the information and has a fixed size. A length field contains the message body's length, a type field indicates the message body's format.
Checksum
The purpose of a checksum is to enable the receiver of the data to check its integrity. The checksum should always take into account the frame header and the frame body. When designing a protocol be sure to follow best practices when choosing the checksumming algorithm, see [1]. Do not use a "sum-of-all-bytes" style checksum!
pycrc generates C source code for a multitude of CRC variants. [2]
Error Handling
The type of Error handling that is used depends on the application. It may be sufficient to simply discard erroneous messages, or to print an error message but it may be required to buffer frames for retransmission requests.
In any case it is important to consider all types of error that may occur (wrong checksums, wrong format, wrong synchronization, …) to avoid that the software module handling the frames enters an unexpected state.
A Sample Framing Format
Let's implement a framing format for text messages that incorporates the mentioned characteristics.
The frame format has a length field, a message field, and a checksum field. A frame is started with the synchronization character, a '$' character. Length field, message field, and checksum field are separated by delimiters (':'). Although a delimiter is not required when the field lengths are known, delimiters can increase readability and allow for later introduction of variable length fields. A frame is terminated with a linefeed ('\n') character.
A State Machine Model
A model for a parsing state machine is shown below. The state machine serves to describe the frame format and to derive parser code. A language can be defined by its grammar or by a parser specification (machine-language duality).
The parser initially waits for a synchronization character in the Sync state. There are states for the length, message, and checksum fields. The Error state is entered when invalid data is received. Reception of a delimiting character initiates a state transition. For example, the reception of a '$' character makes the state machine transition from the Sync state to the Length state.
Deriving Pseudo-Code
Translating the state machine into a pseudo-code skeleton demonstrates how a state machine may be mapped to typical control structures of imperative programming languages. Emphasis is placed on the states and transitions, leaving implementation details for later refinement.
It should be decided which actions are associated with states and which actions are associated with state transitions.
Actual Program Code (Python)
An actual program that parses the described frame format is listed below. The structure of the next_byte function resembles the pseudo-code above but includes additional checks to detect length mismatch errors. It is important to ensure during each state transition that the variables used by the next state are initialized.
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 70 71 72 73 74 75 76 77 78 79 80 81 82 83 84 85 86 87 88 89 90 91 92 93 94 95 96 97 98 99 100 101 102 103 104 105 106 107 108 109 110 111 112 113 114 115 116 117 118 119 120 121 122 123 124 125 126 127 128 129 130 131 132 133 134 135 136 137 138 139 140 141 142 143 144 145 146 147 148 149 150 151 152 153 154 155 156 157 158 159 160 161 162 163 164 165 166 167 168 169 170 171 172 173 174 175 176 177 178 179 180 181 182 183 184 185 |
#!/usr/bin/env python3 # # This program is free software: you can redistribute it and/or modify # it under the terms of the GNU General Public License as published by # the Free Software Foundation, either version 3 of the License, or # (at your option) any later version. # # This program is distributed in the hope that it will be useful, # but WITHOUT ANY WARRANTY; without even the implied warranty of # MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the # GNU General Public License for more details. # # You should have received a copy of the GNU General Public License # along with this program. If not, see <http://www.gnu.org/licenses/>. """"Parse for a simple text-based frame format""" import binascii import enum import string class ProtocolParser: class ProtocolState(enum.Enum): SYNC = 0 LENGTH = 1 MESSAGE = 2 CHECKSUM = 3 ERROR = 4 SYNC_CHARACTER = b'$' MESSAGE_DELIMITER = b':' FRAME_END = b'\n' EXPECTED_LENGTH_BYTES = 3 EXPECTED_CHECKSUM_BYTES = 4 ERROR_LENGTH_TOO_SHORT = "Length field is too short" ERROR_LENGTH_TOO_LONG = "Length field is too long" ERROR_INVALID_LENGTH_BYTE = "Invalid length field byte" ERROR_MESSAGE_TOO_SHORT = "Message field is too short" ERROR_MESSAGE_TOO_LONG = "Message field is too long" ERROR_CHECKSUM_TOO_SHORT = "Checksum field is too short" ERROR_CHECKSUM_TOO_LONG = "Checksum field is too long" ERROR_INVALID_CHECKSUM_BYTE = "Invalid checksum field byte" ERROR_CHECKSUM_MISMATCH = "Checksums do not match" def __init__(self): self.state = self.ProtocolState.SYNC self.length = 0 self.message = bytearray() self.checksum = bytearray() self.error = "" self.bytes_left = 0 def next_byte(self, byte): message_received = False if self.state == self.ProtocolState.SYNC: # Waiting for synchronization character if byte == self.SYNC_CHARACTER: # Expect length information self.state = self.ProtocolState.LENGTH self.bytes_left = self.EXPECTED_LENGTH_BYTES self.length = 0 else: # Ignore non-message byte pass elif self.state == self.ProtocolState.LENGTH: if byte == self.MESSAGE_DELIMITER: if self.bytes_left > 0: # Expected more length bytes self.state = self.ProtocolState.ERROR self.error = self.ERROR_LENGTH_TOO_SHORT else: # Expect message field self.state = self.ProtocolState.MESSAGE self.bytes_left = self.length self.message = bytearray() else: if self.bytes_left > 0: self.bytes_left -= 1 if byte.isdigit(): self.length = self.length * 10 + int(byte) else: self.state = self.ProtocolState.ERROR self.error = self.ERROR_INVALID_LENGTH_BYTE else: # Expected less length bytes self.state = self.ProtocolState.ERROR self.error = self.ERROR_LENGTH_TOO_LONG elif self.state == self.ProtocolState.MESSAGE: if byte == self.MESSAGE_DELIMITER: if self.bytes_left > 0: # Expected more message bytes self.state = self.ProtocolState.ERROR self.error = self.ERROR_MESSAGE_TOO_SHORT else: # Expect checksum field self.state = self.ProtocolState.CHECKSUM self.bytes_left = self.EXPECTED_CHECKSUM_BYTES self.checksum = bytearray() else: if self.bytes_left > 0: self.bytes_left -= 1 self.message += byte else: # Expected less message bytes self.state = self.ProtocolState.ERROR self.error = self.ERROR_MESSAGE_TOO_LONG elif self.state == self.ProtocolState.CHECKSUM: if byte == self.FRAME_END: if self.bytes_left > 0: # Expected more checksum bytes self.state = self.ProtocolState.ERROR self.error = self.ERROR_CHECKSUM_TOO_SHORT else: message_checksum = binascii.crc_hqx(self.message, 0xFFFF) if message_checksum != int(self.checksum, base=16): self.state = self.ProtocolState.ERROR self.error = self.ERROR_CHECKSUM_MISMATCH else: # Message complete message_received = True # Wait for next message self.state = self.ProtocolState.SYNC else: def ishexdigit(b): return b in string.hexdigits.encode() if self.bytes_left > 0: self.bytes_left -= 1 if ishexdigit(byte): self.checksum += byte else: self.state = self.ProtocolState.ERROR self.error = self.ERROR_INVALID_CHECKSUM_BYTE else: # Expected less checksum bytes self.state = self.ProtocolState.ERROR self.error = self.ERROR_CHECKSUM_TOO_LONG if self.state == self.ProtocolState.ERROR: # Simplistic error handling: # Print error and wait for the next message print(f"A protocol error occured: {self.error}") self.state = self.ProtocolState.SYNC return message_received def parse_data(data): p = ProtocolParser() for i in range(len(data)): if p.next_byte(data[i:i+1]): print(f"Decoded: {p.message}") pass if __name__ == "__main__": # A series of bytes that contains message frames, some with errors. # Achieves 100% statement coverage. test_data = bytes( b'ab$017:message arg1 arg2:7CD8\n' b'cd$17:message arg1 arg2:7CD8\n' b'ef$0017:message arg1 arg2:7CD8\n' b'gh$A01:message arg1 arg2:7CD8\n' b'ij$016:message arg1 arg2:7CD8\n' b'kl$018:message arg1 arg2:7CD8\n' b'mn$017:message arg1 arg2:7CD9\n' b'op$017:message arg1 arg2:7CD\n' b'qr$017:message arg1 arg2:07CD8\n' b'st$017:message arg1 arg2:ZCD8\n' b'uv$017:message arg1 arg2:7CD8\n' ) parse_data(test_data) |
When run, the program outputs the following text.
References & Further Reading
[1] | https://www.faa.gov/aircraft/air_cert/design_approvals/air_software/media/TC-14-49.pdf |
[2] | https://github.com/tpircher/pycrc |