Parsing Text-Based Protocols

Text-based communication protocols are often used in UART communications in embedded applications for command and control interfaces. The protocols used by many devices do not follow the OSI model of communication protocols, they rather combine the functions of many layers, from the data link layer to the application layer, into one protocol definition.

Unfortunately, many of the protocols that I have seen are poorly thought-out, implement weak checksums or lack proper delimitation.

Error Handling

The type of Error handling that is used depends on the application. It may be sufficient to simply discard erroneous messages, or to print an error message but it may be required to buffer frames for retransmission requests.

In any case it is important to consider all types of error that may occur (wrong checksums, wrong format, wrong synchronization, …) to avoid that the software module handling the frames enters an unexpected state.

A Sample Framing Format

Let's implement a framing format for text messages that incorporates the mentioned characteristics.

A message frame

A message frame

The frame format has a length field, a message field, and a checksum field. A frame is started with the synchronization character, a '$' character. Length field, message field, and checksum field are separated by delimiters (':'). Although a delimiter is not required when the field lengths are known, delimiters can increase readability and allow for later introduction of variable length fields. A frame is terminated with a linefeed ('\n') character.

A State Machine Model

A model for a parsing state machine is shown below. The state machine serves to describe the frame format and to derive parser code. A language can be defined by its grammar or by a parser specification (machine-language duality).

Frame format state machine model

Frame format state machine model

The parser initially waits for a synchronization character in the Sync state. There are states for the length, message, and checksum fields. The Error state is entered when invalid data is received. Reception of a delimiting character initiates a state transition. For example, the reception of a '$' character makes the state machine transition from the Sync state to the Length state.

Deriving Pseudo-Code

Translating the state machine into a pseudo-code skeleton demonstrates how a state machine may be mapped to typical control structures of imperative programming languages. Emphasis is placed on the states and transitions, leaving implementation details for later refinement.

It should be decided which actions are associated with states and which actions are associated with state transitions.

procedure Process_Byte(Received : Byte) is
   case State is
      when Sync =>
         if Received = '$' then
            Next_State := Length;
         else
            -- Ignore received byte, not part of a frame
         end if;

      when Length =>
         if Received = ':' then
             if 0 <= Length_Received <= 999 then
                Next_State := Message;
             else
                Next_State := Error;
             end if;
         else
            if Too_Many_Bytes then
               Next_State := Error;
            else
               -- Memo Received for length field
            end if;
         end if;

      when Message =>
         if Received = ':' then
             if Message_Received.Length = Length then
                Next_State := Checksum;
             else
                Next_State := Error;
             end if;
         else
            if Too_Many_Bytes then
               Next_State := Error;
            else
               -- Memo Received for message field
            end if;
         end if;

      when Checksum =>
         if Received = LF then
            if Checksum(Message) = Received_Checksum then
               Process_Message(Message);
               Next_State := Sync;
            else
               Next_State = Error;
            end if;
         else
            if Too_Many_Bytes then
               Next_State := Error;
            else
               -- Memo Received for checksum field
            end if;
         end if;
   end case;

   if State = Error then
      -- Handle the error and return to Sync state immediately
      Next_State := Sync
   end if;
end Process_Byte;

Actual Program Code (Python)

An actual program that parses the described frame format is listed below. The structure of the next_byte function resembles the pseudo-code above but includes additional checks to detect length mismatch errors. It is important to ensure during each state transition that the variables used by the next state are initialized.

protocol/protocol.py (Source)

#!/usr/bin/env python3
#
# This program is free software: you can redistribute it and/or modify
# it under the terms of the GNU General Public License as published by
# the Free Software Foundation, either version 3 of the License, or
# (at your option) any later version.
#
# This program is distributed in the hope that it will be useful,
# but WITHOUT ANY WARRANTY; without even the implied warranty of
# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the
# GNU General Public License for more details.
#
# You should have received a copy of the GNU General Public License
# along with this program.  If not, see <http://www.gnu.org/licenses/>.

""""Parse for a simple text-based frame format"""

import binascii
import enum
import string


class ProtocolParser:
    class ProtocolState(enum.Enum):
        SYNC = 0
        LENGTH = 1
        MESSAGE = 2
        CHECKSUM = 3
        ERROR = 4

    SYNC_CHARACTER = b'$'
    MESSAGE_DELIMITER = b':'
    FRAME_END = b'\n'

    EXPECTED_LENGTH_BYTES = 3
    EXPECTED_CHECKSUM_BYTES = 4

    ERROR_LENGTH_TOO_SHORT = "Length field is too short"
    ERROR_LENGTH_TOO_LONG = "Length field is too long"
    ERROR_INVALID_LENGTH_BYTE = "Invalid length field byte"

    ERROR_MESSAGE_TOO_SHORT = "Message field is too short"
    ERROR_MESSAGE_TOO_LONG = "Message field is too long"

    ERROR_CHECKSUM_TOO_SHORT = "Checksum field is too short"
    ERROR_CHECKSUM_TOO_LONG = "Checksum field is too long"
    ERROR_INVALID_CHECKSUM_BYTE = "Invalid checksum field byte"
    ERROR_CHECKSUM_MISMATCH = "Checksums do not match"

    def __init__(self):
        self.state = self.ProtocolState.SYNC
        self.length = 0
        self.message = bytearray()
        self.checksum = bytearray()
        self.error = ""

        self.bytes_left = 0

    def next_byte(self, byte):
        message_received = False

        if self.state == self.ProtocolState.SYNC:
            # Waiting for synchronization character

            if byte == self.SYNC_CHARACTER:
                # Expect length information
                self.state = self.ProtocolState.LENGTH
                self.bytes_left = self.EXPECTED_LENGTH_BYTES
                self.length = 0
            else:
                # Ignore non-message byte
                pass

        elif self.state == self.ProtocolState.LENGTH:
            if byte == self.MESSAGE_DELIMITER:
                if self.bytes_left > 0:
                    # Expected more length bytes
                    self.state = self.ProtocolState.ERROR
                    self.error = self.ERROR_LENGTH_TOO_SHORT
                else:
                    # Expect message field
                    self.state = self.ProtocolState.MESSAGE
                    self.bytes_left = self.length
                    self.message = bytearray()

            else:
                if self.bytes_left > 0:
                    self.bytes_left -= 1
                    if byte.isdigit():
                        self.length = self.length * 10 + int(byte)
                    else:
                        self.state = self.ProtocolState.ERROR
                        self.error = self.ERROR_INVALID_LENGTH_BYTE
                else:
                    # Expected less length bytes
                    self.state = self.ProtocolState.ERROR
                    self.error = self.ERROR_LENGTH_TOO_LONG

        elif self.state == self.ProtocolState.MESSAGE:
            if byte == self.MESSAGE_DELIMITER:
                if self.bytes_left > 0:
                    # Expected more message bytes
                    self.state = self.ProtocolState.ERROR
                    self.error = self.ERROR_MESSAGE_TOO_SHORT
                else:
                    # Expect checksum field
                    self.state = self.ProtocolState.CHECKSUM
                    self.bytes_left = self.EXPECTED_CHECKSUM_BYTES
                    self.checksum = bytearray()
            else:
                if self.bytes_left > 0:
                    self.bytes_left -= 1
                    self.message += byte
                else:
                    # Expected less message bytes
                    self.state = self.ProtocolState.ERROR
                    self.error = self.ERROR_MESSAGE_TOO_LONG

        elif self.state == self.ProtocolState.CHECKSUM:
            if byte == self.FRAME_END:
                if self.bytes_left > 0:
                    # Expected more checksum bytes
                    self.state = self.ProtocolState.ERROR
                    self.error = self.ERROR_CHECKSUM_TOO_SHORT
                else:
                    message_checksum = binascii.crc_hqx(self.message, 0xFFFF)
                    if message_checksum != int(self.checksum, base=16):
                        self.state = self.ProtocolState.ERROR
                        self.error = self.ERROR_CHECKSUM_MISMATCH
                    else:
                        # Message complete
                        message_received = True
                        # Wait for next message
                        self.state = self.ProtocolState.SYNC
            else:
                def ishexdigit(b):
                    return b in string.hexdigits.encode()

                if self.bytes_left > 0:
                    self.bytes_left -= 1
                    if ishexdigit(byte):
                        self.checksum += byte
                    else:
                        self.state = self.ProtocolState.ERROR
                        self.error = self.ERROR_INVALID_CHECKSUM_BYTE
                else:
                    # Expected less checksum bytes
                    self.state = self.ProtocolState.ERROR
                    self.error = self.ERROR_CHECKSUM_TOO_LONG

        if self.state == self.ProtocolState.ERROR:
            # Simplistic error handling:
            # Print error and wait for the next message
            print(f"A protocol error occured: {self.error}")
            self.state = self.ProtocolState.SYNC

        return message_received


def parse_data(data):
    p = ProtocolParser()
    for i in range(len(data)):
        if p.next_byte(data[i:i+1]):
            print(f"Decoded: {p.message}")
            pass


if __name__ == "__main__":
    # A series of bytes that contains message frames, some with errors.
    # Achieves 100% statement coverage.
    test_data = bytes(
        b'ab$017:message arg1 arg2:7CD8\n'
        b'cd$17:message arg1 arg2:7CD8\n'
        b'ef$0017:message arg1 arg2:7CD8\n'
        b'gh$A01:message arg1 arg2:7CD8\n'
        b'ij$016:message arg1 arg2:7CD8\n'
        b'kl$018:message arg1 arg2:7CD8\n'
        b'mn$017:message arg1 arg2:7CD9\n'
        b'op$017:message arg1 arg2:7CD\n'
        b'qr$017:message arg1 arg2:07CD8\n'
        b'st$017:message arg1 arg2:ZCD8\n'
        b'uv$017:message arg1 arg2:7CD8\n'
    )

    parse_data(test_data)

When run, the program outputs the following text.

Decoded: bytearray(b'message arg1 arg2')
A protocol error occured: Length field is too short
A protocol error occured: Length field is too long
A protocol error occured: Invalid length field byte
A protocol error occured: Message field is too long
A protocol error occured: Message field is too short
A protocol error occured: Checksums do not match
A protocol error occured: Checksum field is too short
A protocol error occured: Checksum field is too long
A protocol error occured: Invalid checksum field byte
Decoded: bytearray(b'message arg1 arg2')