Implement a more generic way to handle different TLV encodings
This commit is contained in:
@@ -61,6 +61,7 @@ class Datatype(Enum):
|
|||||||
extended = auto()
|
extended = auto()
|
||||||
longextended = auto()
|
longextended = auto()
|
||||||
evs = auto()
|
evs = auto()
|
||||||
|
vsa = auto()
|
||||||
|
|
||||||
|
|
||||||
class Encrypt(IntEnum):
|
class Encrypt(IntEnum):
|
||||||
|
|||||||
@@ -7,11 +7,11 @@ import hashlib
|
|||||||
import secrets
|
import secrets
|
||||||
import struct
|
import struct
|
||||||
from collections import namedtuple
|
from collections import namedtuple
|
||||||
from typing import List, Optional, Tuple, Union
|
from typing import Any, List, Optional, Tuple
|
||||||
|
|
||||||
from pyrad3.dictionary import Dictionary
|
from pyrad3.dictionary import Dictionary
|
||||||
from pyrad3.tools import decode_attr
|
from pyrad3.tools import decode_attr
|
||||||
from pyrad3.types import Attribute as DictAttr, Code, Datatype
|
from pyrad3.types import Code, Datatype
|
||||||
|
|
||||||
RANDOM_GENERATOR = secrets.SystemRandom()
|
RANDOM_GENERATOR = secrets.SystemRandom()
|
||||||
MD5 = hashlib.md5
|
MD5 = hashlib.md5
|
||||||
@@ -24,7 +24,8 @@ class PacketError(Exception):
|
|||||||
Header = namedtuple("Header", ["code", "radius_id", "length", "authenticator"])
|
Header = namedtuple("Header", ["code", "radius_id", "length", "authenticator"])
|
||||||
Attribute = namedtuple("Attribute", ["name", "pos", "type", "length", "value"])
|
Attribute = namedtuple("Attribute", ["name", "pos", "type", "length", "value"])
|
||||||
|
|
||||||
PreParsedAttribute = List[Tuple[int, int, DictAttr]]
|
PreParsedAttributes = List[Tuple[Tuple[int, ...], bytes, int]]
|
||||||
|
SpecialTlvDescription = Tuple[Tuple[int, ...], bytes, int]
|
||||||
|
|
||||||
|
|
||||||
def parse_header(raw_packet: bytes) -> Header:
|
def parse_header(raw_packet: bytes) -> Header:
|
||||||
@@ -58,142 +59,160 @@ def parse_attributes(
|
|||||||
separately.
|
separately.
|
||||||
"""
|
"""
|
||||||
attributes = []
|
attributes = []
|
||||||
packet = raw_packet[20:]
|
packet = raw_packet[20:] # Skip RADIUS Header
|
||||||
|
|
||||||
while packet:
|
for key, value, offset in pre_parse_attributes(rad_dict, packet):
|
||||||
|
attr_def = rad_dict.attrindex.get(key)
|
||||||
|
length = len(value)
|
||||||
|
dec_value: Any = value # to silence mypy
|
||||||
|
if attr_def is None:
|
||||||
|
name = "Unknown-Attribute"
|
||||||
|
datatype = Datatype.octets
|
||||||
|
else:
|
||||||
|
name = attr_def.name
|
||||||
|
datatype = attr_def.datatype
|
||||||
|
dec_value = decode_attr(datatype, value)
|
||||||
try:
|
try:
|
||||||
(key, length) = struct.unpack("!BB", packet[0:2])
|
dec_value = attr_def.values[dec_value]
|
||||||
|
except KeyError:
|
||||||
|
pass
|
||||||
|
attributes.append(
|
||||||
|
Attribute(
|
||||||
|
name=name,
|
||||||
|
pos=offset,
|
||||||
|
length=length,
|
||||||
|
type=datatype,
|
||||||
|
value=dec_value,
|
||||||
|
)
|
||||||
|
)
|
||||||
|
|
||||||
|
return attributes
|
||||||
|
|
||||||
|
|
||||||
|
def pre_parse_attributes( # pylint: disable=too-many-branches
|
||||||
|
rad_dict: Dictionary, packet_body: bytes
|
||||||
|
) -> PreParsedAttributes:
|
||||||
|
"""Find Attributes location and keystack"""
|
||||||
|
attributes = []
|
||||||
|
offset = 0
|
||||||
|
|
||||||
|
while packet_body:
|
||||||
|
try:
|
||||||
|
(key, length) = struct.unpack("!BB", packet_body[0:2])
|
||||||
except struct.error as exc:
|
except struct.error as exc:
|
||||||
raise PacketError("Attribute header is corrupt") from exc
|
raise PacketError("Attribute header is corrupt") from exc
|
||||||
if length < 2:
|
if length < 2:
|
||||||
raise PacketError(f"Attribute length ({length}) is too small")
|
raise PacketError(f"Attribute length ({length}) is too small")
|
||||||
|
|
||||||
value = packet[2:length]
|
value = packet_body[2:length]
|
||||||
offset = len(raw_packet) - len(packet) + length
|
|
||||||
if key == 26:
|
|
||||||
try:
|
|
||||||
attributes.extend(
|
|
||||||
parse_vendor_attributes(rad_dict, offset, value)
|
|
||||||
)
|
|
||||||
except (PacketError, IndexError):
|
|
||||||
attributes.append(
|
|
||||||
Attribute(
|
|
||||||
name="Unknown-Attribute",
|
|
||||||
pos=offset,
|
|
||||||
type=Datatype.octets,
|
|
||||||
length=int(packet[1]),
|
|
||||||
value=packet[2:],
|
|
||||||
)
|
|
||||||
)
|
|
||||||
else:
|
|
||||||
try:
|
try:
|
||||||
attr_def = rad_dict.attrindex[key]
|
attr_def = rad_dict.attrindex[key]
|
||||||
except KeyError as exc:
|
if attr_def.datatype == Datatype.vsa:
|
||||||
# TODO
|
tmp_attributes = decode_vsa(key, value, offset)
|
||||||
raise exc
|
else:
|
||||||
attributes.append(
|
if attr_def.datatype == Datatype.extended:
|
||||||
Attribute(
|
key, value, modifier = decode_extended(key, value, offset)
|
||||||
name=attr_def.name,
|
elif attr_def.datatype == Datatype.longextended:
|
||||||
pos=offset,
|
key, value, modifier = decode_longextended(
|
||||||
type=attr_def.datatype,
|
key, value, offset
|
||||||
length=length - 2,
|
|
||||||
value=decode_attr(attr_def.datatype, packet[2:length],),
|
|
||||||
)
|
)
|
||||||
)
|
elif attr_def.datatype == Datatype.evs:
|
||||||
packet = packet[length:]
|
# Shouldn't this be part of extended/longextended?
|
||||||
|
key, value, modifier = decode_evs(key, value, offset)
|
||||||
|
elif attr_def.datatype == Datatype.concat:
|
||||||
|
key, value, modifier = decode_concat(key, value, offset)
|
||||||
|
else:
|
||||||
|
modifier = 2
|
||||||
|
# Redundand in the "normal" case
|
||||||
|
tmp_attributes = [(key, value, offset + modifier,)]
|
||||||
|
except (KeyError, IndexError):
|
||||||
|
# We do not know the TLV, but the packet seems to be well-formed so far
|
||||||
|
tmp_attributes = [(key, value, offset + 2)]
|
||||||
|
|
||||||
|
for key, value, offset in tmp_attributes:
|
||||||
|
adef = rad_dict.attrindex.get(key)
|
||||||
|
if adef is not None and adef.datatype == Datatype.tlv:
|
||||||
|
attributes.extend(
|
||||||
|
decode_tlv(rad_dict, list(key), value, offset)
|
||||||
|
)
|
||||||
|
else:
|
||||||
|
attributes.append((key, value, offset))
|
||||||
|
offset += length
|
||||||
|
packet_body = packet_body[length:]
|
||||||
return attributes
|
return attributes
|
||||||
|
|
||||||
|
|
||||||
def parse_vendor_attributes(
|
def decode_vsa(key: int, value: bytes, offset: int) -> PreParsedAttributes:
|
||||||
rad_dict: Dictionary, offset: int, vendor_value: bytes
|
"""Decode a TLV of type VSA (Vendor-Specific-Attribute)"""
|
||||||
) -> List[Attribute]:
|
if len(value) < 4:
|
||||||
"""Parse A Vendor Attribute"""
|
|
||||||
if len(vendor_value) < 4:
|
|
||||||
raise PacketError
|
raise PacketError
|
||||||
vendor_id = int.from_bytes(vendor_value[:4], "big")
|
vendor_id = int.from_bytes(value[:4], "big")
|
||||||
vendor_prefix = [26, vendor_id]
|
keystack = [key, vendor_id]
|
||||||
|
vendor_attributes: PreParsedAttributes = []
|
||||||
attributes = []
|
value = value[4:]
|
||||||
vendor_tlv = vendor_value[4:]
|
|
||||||
offset += 4
|
offset += 4
|
||||||
while vendor_tlv:
|
|
||||||
|
while value:
|
||||||
try:
|
try:
|
||||||
(key, attr_length) = struct.unpack("!BB", vendor_tlv[0:2])
|
(key, length) = struct.unpack("!BB", value[:2])
|
||||||
print(attr_length)
|
except struct.error as exc:
|
||||||
keystack = vendor_prefix + [key]
|
raise PacketError("VSA Attribute Header is corrupt") from exc
|
||||||
attr_def = rad_dict.attrindex[tuple(keystack)]
|
if length < 2:
|
||||||
except (struct.error, KeyError) as exc:
|
raise PacketError(f"Attribute length({length}) is too small")
|
||||||
attributes.append(
|
|
||||||
Attribute(
|
|
||||||
name="Unknown-Attribute",
|
|
||||||
pos=offset - len(vendor_value),
|
|
||||||
type=Datatype.octets,
|
|
||||||
length=len(vendor_value) - 4,
|
|
||||||
value=vendor_value,
|
|
||||||
)
|
|
||||||
)
|
|
||||||
if exc is struct.error:
|
|
||||||
break
|
|
||||||
else:
|
|
||||||
if attr_def.datatype == Datatype.tlv:
|
|
||||||
attr_pos = _parse_tlv(rad_dict, vendor_tlv, 2, keystack)
|
|
||||||
else:
|
|
||||||
attr_pos = [(2, attr_length, attr_def)]
|
|
||||||
|
|
||||||
parsed_attributes: List[Attribute] = []
|
vendor_key = tuple(keystack + [key])
|
||||||
for local_offset, length, attr_def in attr_pos:
|
vendor_attributes.append((vendor_key, value[2:length], offset + 2))
|
||||||
length -= 2
|
|
||||||
parsed_attributes.append(
|
|
||||||
Attribute(
|
|
||||||
name=attr_def.name,
|
|
||||||
pos=offset + local_offset,
|
|
||||||
type=attr_def.datatype,
|
|
||||||
length=length,
|
|
||||||
value=decode_attr(
|
|
||||||
attr_def.datatype,
|
|
||||||
vendor_tlv[local_offset : local_offset + length],
|
|
||||||
),
|
|
||||||
)
|
|
||||||
)
|
|
||||||
|
|
||||||
offset = offset + attr_length
|
offset += length
|
||||||
attributes.extend(parsed_attributes)
|
value = value[length:]
|
||||||
vendor_tlv = vendor_tlv[attr_length:]
|
|
||||||
return attributes
|
return vendor_attributes
|
||||||
|
|
||||||
|
|
||||||
def _parse_tlv(
|
def decode_extended(
|
||||||
rad_dict: Dictionary, block: bytes, local_offset: int, key_stack: List[int]
|
key: int, value: bytes, offset: int
|
||||||
) -> PreParsedAttribute:
|
) -> SpecialTlvDescription:
|
||||||
|
"""Decode an Attribute of type extended"""
|
||||||
|
raise NotImplementedError
|
||||||
|
|
||||||
|
|
||||||
|
def decode_longextended(
|
||||||
|
key: int, value: bytes, offset: int
|
||||||
|
) -> SpecialTlvDescription:
|
||||||
|
"""Decode an Attribute of type long-extended"""
|
||||||
|
raise NotImplementedError
|
||||||
|
|
||||||
|
|
||||||
|
def decode_concat(key: int, value: bytes, offset: int) -> SpecialTlvDescription:
|
||||||
|
"""Decode an Attribute of type concat"""
|
||||||
|
raise NotImplementedError
|
||||||
|
|
||||||
|
|
||||||
|
def decode_evs(key: int, value: bytes, offset: int) -> SpecialTlvDescription:
|
||||||
|
"""Decode an Attribute of type EVS (Extended Vendor Specific)"""
|
||||||
|
raise NotImplementedError
|
||||||
|
|
||||||
|
|
||||||
|
def decode_tlv(
|
||||||
|
rad_dict: Dictionary, key_stack: List[int], value: bytes, offset: int
|
||||||
|
) -> PreParsedAttributes:
|
||||||
|
"""(Recursively) Decode an Attribute of type TLV"""
|
||||||
# get a list of flattened radius attributes
|
# get a list of flattened radius attributes
|
||||||
ret = []
|
ret = []
|
||||||
while block:
|
while value:
|
||||||
(key, length) = struct.unpack("!BB", block[0:2])
|
(key, length) = struct.unpack("!BB", value[0:2])
|
||||||
attr_def = rad_dict.attrindex[tuple(key_stack)]
|
attr_def = rad_dict.attrindex[tuple(key_stack)]
|
||||||
if attr_def.datatype == Datatype.tlv:
|
if attr_def.datatype == Datatype.tlv:
|
||||||
key_stack.append(key)
|
key_stack.append(key)
|
||||||
block = block[:length]
|
value = value[:length]
|
||||||
ret.extend(
|
ret.extend(decode_tlv(rad_dict, key_stack, value[2:], offset + 2))
|
||||||
_parse_tlv(rad_dict, block[2:], local_offset + 2, key_stack)
|
|
||||||
)
|
|
||||||
key_stack.pop()
|
key_stack.pop()
|
||||||
else:
|
else:
|
||||||
ret.append((local_offset + 2, length, attr_def))
|
ret.append((tuple(key_stack), value, offset + 2))
|
||||||
local_offset += length
|
offset += length
|
||||||
return ret
|
return ret
|
||||||
|
|
||||||
|
|
||||||
def parse_key(
|
|
||||||
rad_dict: Dictionary, key_id: Union[int, Tuple[int, ...]]
|
|
||||||
) -> Union[str, int, Tuple[int, ...]]:
|
|
||||||
"""Parse the key in the Dictionary Context"""
|
|
||||||
try:
|
|
||||||
return rad_dict.attrindex[key_id].name
|
|
||||||
except KeyError:
|
|
||||||
return key_id
|
|
||||||
|
|
||||||
|
|
||||||
def calculate_authenticator(
|
def calculate_authenticator(
|
||||||
secret: bytes, authenticator: bytes, raw_packet: bytes
|
secret: bytes, authenticator: bytes, raw_packet: bytes
|
||||||
) -> bytes:
|
) -> bytes:
|
||||||
|
|||||||
@@ -20,6 +20,8 @@ ATTRIBUTE RFC-SPACE-TYPE-EXTENDED 19 extended
|
|||||||
ATTRIBUTE RFC-SPACE-TYPE-LONG-EXTENDED 20 long-extended
|
ATTRIBUTE RFC-SPACE-TYPE-LONG-EXTENDED 20 long-extended
|
||||||
ATTRIBUTE RFC-SPACE-TYPE-EVS 21 evs
|
ATTRIBUTE RFC-SPACE-TYPE-EVS 21 evs
|
||||||
|
|
||||||
|
ATTRIBUTE VENDOR-SPECIFIC 26 vsa
|
||||||
|
|
||||||
VENDOR TEST 1234
|
VENDOR TEST 1234
|
||||||
|
|
||||||
BEGIN-VENDOR TEST
|
BEGIN-VENDOR TEST
|
||||||
|
|||||||
Reference in New Issue
Block a user