From be540bdfd3b5d8723d7937024a216b63ba8974a6 Mon Sep 17 00:00:00 2001 From: Istvan Ruzman Date: Fri, 4 Sep 2020 19:12:35 +0200 Subject: [PATCH] Implement a more generic way to handle different TLV encodings --- src/pyrad3/types.py | 1 + src/pyrad3/utils.py | 249 +++++++++++++++++++++------------------- tests/dictionaries/dict | 2 + 3 files changed, 137 insertions(+), 115 deletions(-) diff --git a/src/pyrad3/types.py b/src/pyrad3/types.py index 6beaad5..052ca57 100644 --- a/src/pyrad3/types.py +++ b/src/pyrad3/types.py @@ -61,6 +61,7 @@ class Datatype(Enum): extended = auto() longextended = auto() evs = auto() + vsa = auto() class Encrypt(IntEnum): diff --git a/src/pyrad3/utils.py b/src/pyrad3/utils.py index cf9081d..e72fe8e 100644 --- a/src/pyrad3/utils.py +++ b/src/pyrad3/utils.py @@ -7,11 +7,11 @@ import hashlib import secrets import struct from collections import namedtuple -from typing import List, Optional, Tuple, Union +from typing import Any, List, Optional, Tuple from pyrad3.dictionary import Dictionary from pyrad3.tools import decode_attr -from pyrad3.types import Attribute as DictAttr, Code, Datatype +from pyrad3.types import Code, Datatype RANDOM_GENERATOR = secrets.SystemRandom() MD5 = hashlib.md5 @@ -24,7 +24,8 @@ class PacketError(Exception): Header = namedtuple("Header", ["code", "radius_id", "length", "authenticator"]) Attribute = namedtuple("Attribute", ["name", "pos", "type", "length", "value"]) -PreParsedAttribute = List[Tuple[int, int, DictAttr]] +PreParsedAttributes = List[Tuple[Tuple[int, ...], bytes, int]] +SpecialTlvDescription = Tuple[Tuple[int, ...], bytes, int] def parse_header(raw_packet: bytes) -> Header: @@ -58,142 +59,160 @@ def parse_attributes( separately. """ attributes = [] - packet = raw_packet[20:] + packet = raw_packet[20:] # Skip RADIUS Header - while packet: + for key, value, offset in pre_parse_attributes(rad_dict, packet): + attr_def = rad_dict.attrindex.get(key) + length = len(value) + dec_value: Any = value # to silence mypy + if attr_def is None: + name = "Unknown-Attribute" + datatype = Datatype.octets + else: + name = attr_def.name + datatype = attr_def.datatype + dec_value = decode_attr(datatype, value) + try: + dec_value = attr_def.values[dec_value] + except KeyError: + pass + attributes.append( + Attribute( + name=name, + pos=offset, + length=length, + type=datatype, + value=dec_value, + ) + ) + + return attributes + + +def pre_parse_attributes( # pylint: disable=too-many-branches + rad_dict: Dictionary, packet_body: bytes +) -> PreParsedAttributes: + """Find Attributes location and keystack""" + attributes = [] + offset = 0 + + while packet_body: try: - (key, length) = struct.unpack("!BB", packet[0:2]) + (key, length) = struct.unpack("!BB", packet_body[0:2]) except struct.error as exc: raise PacketError("Attribute header is corrupt") from exc if length < 2: raise PacketError(f"Attribute length ({length}) is too small") - value = packet[2:length] - offset = len(raw_packet) - len(packet) + length - if key == 26: - try: - attributes.extend( - parse_vendor_attributes(rad_dict, offset, value) - ) - except (PacketError, IndexError): - attributes.append( - Attribute( - name="Unknown-Attribute", - pos=offset, - type=Datatype.octets, - length=int(packet[1]), - value=packet[2:], - ) - ) - else: - try: - attr_def = rad_dict.attrindex[key] - except KeyError as exc: - # TODO - raise exc - attributes.append( - Attribute( - name=attr_def.name, - pos=offset, - type=attr_def.datatype, - length=length - 2, - value=decode_attr(attr_def.datatype, packet[2:length],), - ) - ) - packet = packet[length:] - - return attributes - - -def parse_vendor_attributes( - rad_dict: Dictionary, offset: int, vendor_value: bytes -) -> List[Attribute]: - """Parse A Vendor Attribute""" - if len(vendor_value) < 4: - raise PacketError - vendor_id = int.from_bytes(vendor_value[:4], "big") - vendor_prefix = [26, vendor_id] - - attributes = [] - vendor_tlv = vendor_value[4:] - offset += 4 - while vendor_tlv: + value = packet_body[2:length] try: - (key, attr_length) = struct.unpack("!BB", vendor_tlv[0:2]) - print(attr_length) - keystack = vendor_prefix + [key] - attr_def = rad_dict.attrindex[tuple(keystack)] - except (struct.error, KeyError) as exc: - attributes.append( - Attribute( - name="Unknown-Attribute", - pos=offset - len(vendor_value), - type=Datatype.octets, - length=len(vendor_value) - 4, - value=vendor_value, - ) - ) - if exc is struct.error: - break - else: - if attr_def.datatype == Datatype.tlv: - attr_pos = _parse_tlv(rad_dict, vendor_tlv, 2, keystack) + attr_def = rad_dict.attrindex[key] + if attr_def.datatype == Datatype.vsa: + tmp_attributes = decode_vsa(key, value, offset) else: - attr_pos = [(2, attr_length, attr_def)] - - parsed_attributes: List[Attribute] = [] - for local_offset, length, attr_def in attr_pos: - length -= 2 - parsed_attributes.append( - Attribute( - name=attr_def.name, - pos=offset + local_offset, - type=attr_def.datatype, - length=length, - value=decode_attr( - attr_def.datatype, - vendor_tlv[local_offset : local_offset + length], - ), + if attr_def.datatype == Datatype.extended: + key, value, modifier = decode_extended(key, value, offset) + elif attr_def.datatype == Datatype.longextended: + key, value, modifier = decode_longextended( + key, value, offset ) - ) + elif attr_def.datatype == Datatype.evs: + # Shouldn't this be part of extended/longextended? + key, value, modifier = decode_evs(key, value, offset) + elif attr_def.datatype == Datatype.concat: + key, value, modifier = decode_concat(key, value, offset) + else: + modifier = 2 + # Redundand in the "normal" case + tmp_attributes = [(key, value, offset + modifier,)] + except (KeyError, IndexError): + # We do not know the TLV, but the packet seems to be well-formed so far + tmp_attributes = [(key, value, offset + 2)] - offset = offset + attr_length - attributes.extend(parsed_attributes) - vendor_tlv = vendor_tlv[attr_length:] + for key, value, offset in tmp_attributes: + adef = rad_dict.attrindex.get(key) + if adef is not None and adef.datatype == Datatype.tlv: + attributes.extend( + decode_tlv(rad_dict, list(key), value, offset) + ) + else: + attributes.append((key, value, offset)) + offset += length + packet_body = packet_body[length:] return attributes -def _parse_tlv( - rad_dict: Dictionary, block: bytes, local_offset: int, key_stack: List[int] -) -> PreParsedAttribute: +def decode_vsa(key: int, value: bytes, offset: int) -> PreParsedAttributes: + """Decode a TLV of type VSA (Vendor-Specific-Attribute)""" + if len(value) < 4: + raise PacketError + vendor_id = int.from_bytes(value[:4], "big") + keystack = [key, vendor_id] + vendor_attributes: PreParsedAttributes = [] + value = value[4:] + offset += 4 + + while value: + try: + (key, length) = struct.unpack("!BB", value[:2]) + except struct.error as exc: + raise PacketError("VSA Attribute Header is corrupt") from exc + if length < 2: + raise PacketError(f"Attribute length({length}) is too small") + + vendor_key = tuple(keystack + [key]) + vendor_attributes.append((vendor_key, value[2:length], offset + 2)) + + offset += length + value = value[length:] + + return vendor_attributes + + +def decode_extended( + key: int, value: bytes, offset: int +) -> SpecialTlvDescription: + """Decode an Attribute of type extended""" + raise NotImplementedError + + +def decode_longextended( + key: int, value: bytes, offset: int +) -> SpecialTlvDescription: + """Decode an Attribute of type long-extended""" + raise NotImplementedError + + +def decode_concat(key: int, value: bytes, offset: int) -> SpecialTlvDescription: + """Decode an Attribute of type concat""" + raise NotImplementedError + + +def decode_evs(key: int, value: bytes, offset: int) -> SpecialTlvDescription: + """Decode an Attribute of type EVS (Extended Vendor Specific)""" + raise NotImplementedError + + +def decode_tlv( + rad_dict: Dictionary, key_stack: List[int], value: bytes, offset: int +) -> PreParsedAttributes: + """(Recursively) Decode an Attribute of type TLV""" # get a list of flattened radius attributes ret = [] - while block: - (key, length) = struct.unpack("!BB", block[0:2]) + while value: + (key, length) = struct.unpack("!BB", value[0:2]) attr_def = rad_dict.attrindex[tuple(key_stack)] if attr_def.datatype == Datatype.tlv: key_stack.append(key) - block = block[:length] - ret.extend( - _parse_tlv(rad_dict, block[2:], local_offset + 2, key_stack) - ) + value = value[:length] + ret.extend(decode_tlv(rad_dict, key_stack, value[2:], offset + 2)) key_stack.pop() else: - ret.append((local_offset + 2, length, attr_def)) - local_offset += length + ret.append((tuple(key_stack), value, offset + 2)) + offset += length return ret -def parse_key( - rad_dict: Dictionary, key_id: Union[int, Tuple[int, ...]] -) -> Union[str, int, Tuple[int, ...]]: - """Parse the key in the Dictionary Context""" - try: - return rad_dict.attrindex[key_id].name - except KeyError: - return key_id - - def calculate_authenticator( secret: bytes, authenticator: bytes, raw_packet: bytes ) -> bytes: diff --git a/tests/dictionaries/dict b/tests/dictionaries/dict index 147bd20..3bfe61d 100644 --- a/tests/dictionaries/dict +++ b/tests/dictionaries/dict @@ -20,6 +20,8 @@ ATTRIBUTE RFC-SPACE-TYPE-EXTENDED 19 extended ATTRIBUTE RFC-SPACE-TYPE-LONG-EXTENDED 20 long-extended ATTRIBUTE RFC-SPACE-TYPE-EVS 21 evs +ATTRIBUTE VENDOR-SPECIFIC 26 vsa + VENDOR TEST 1234 BEGIN-VENDOR TEST