Add: evs decoding - no working tests yet
EVS decoding should be mostly working now. We do not have any tests yet, because it is unclear to me how EVS is defined in dictionaries.
This commit is contained in:
@@ -9,9 +9,9 @@ import time
|
||||
from ipaddress import IPv4Address, IPv6Address
|
||||
from typing import Optional, Union, cast
|
||||
|
||||
import pyrad3.host as H
|
||||
import pyrad3.packet as P
|
||||
from pyrad3.dictionary import Dictionary
|
||||
from pyrad3.host import Host
|
||||
|
||||
SUPPORTED_SEND_TYPES = [
|
||||
P.Code.AccessRequest,
|
||||
@@ -34,7 +34,7 @@ class UnsupportedPacketType(Exception):
|
||||
"""Exception for received packets"""
|
||||
|
||||
|
||||
class Client(H.Host):
|
||||
class Client(Host):
|
||||
"""A simple and extensible RADIUS Client."""
|
||||
|
||||
def __init__(
|
||||
|
||||
@@ -126,9 +126,6 @@ def pre_decode_attributes( # pylint: disable=too-many-branches
|
||||
key, value, modifier = decode_extended(key, value)
|
||||
elif attr_def.datatype == Datatype.longextended:
|
||||
key, value, modifier = decode_longextended(key, value)
|
||||
elif attr_def.datatype == Datatype.evs:
|
||||
# Shouldn't this be part of extended/longextended?
|
||||
key, value, modifier = decode_evs(key, value, offset)
|
||||
elif attr_def.datatype == Datatype.concat:
|
||||
key, value, modifier = decode_concat(key, value, offset)
|
||||
else:
|
||||
@@ -141,11 +138,16 @@ def pre_decode_attributes( # pylint: disable=too-many-branches
|
||||
tmp_attributes = [(key, value, offset + 2)]
|
||||
|
||||
for key, value, offset in tmp_attributes:
|
||||
adef = rad_dict.attrindex.get(key)
|
||||
if adef is not None and adef.datatype == Datatype.tlv:
|
||||
try:
|
||||
adef = rad_dict.attrindex[key]
|
||||
if adef.datatype == Datatype.tlv:
|
||||
# TODO: deal with tagged tlvs
|
||||
attributes.extend(decode_tlv(rad_dict, list(key), value, offset))
|
||||
elif adef.datatype == Datatype.evs:
|
||||
attributes.append(decode_evs(key, value, offset))
|
||||
else:
|
||||
raise ValueError
|
||||
except (ValueError, KeyError):
|
||||
attributes.append((key, value, offset))
|
||||
offset += length
|
||||
packet_body = packet_body[length:]
|
||||
@@ -223,14 +225,14 @@ def decode_vsa(
|
||||
def decode_extended(key: int, value: bytes) -> SpecialTlvDescription:
|
||||
"""Decode an Attribute of type extended"""
|
||||
key = (key, value[0])
|
||||
value = value[1:-2]
|
||||
value = value[1:]
|
||||
return (key, value, 3)
|
||||
|
||||
|
||||
def decode_longextended(key: int, value: bytes) -> SpecialTlvDescription:
|
||||
"""Decode an Attribute of type long-extended"""
|
||||
key = (key, value[0])
|
||||
value = value[2:-3]
|
||||
value = value[2:]
|
||||
return (key, value, 4)
|
||||
|
||||
|
||||
@@ -239,9 +241,13 @@ def decode_concat(key: int, value: bytes, offset: int) -> SpecialTlvDescription:
|
||||
raise NotImplementedError
|
||||
|
||||
|
||||
def decode_evs(key: int, value: bytes, offset: int) -> SpecialTlvDescription:
|
||||
def decode_evs(key: int, value: bytes, offset: int) -> PreParsedAttributes:
|
||||
"""Decode an Attribute of type EVS (Extended Vendor Specific)"""
|
||||
raise NotImplementedError
|
||||
vendor_id = int.from_bytes(value[:4], "big")
|
||||
vendor_type = value[4]
|
||||
key = tuple(list(key) + [vendor_id, vendor_type])
|
||||
value = value[5:]
|
||||
return (key, value, 5)
|
||||
|
||||
|
||||
def decode_tlv(
|
||||
|
||||
@@ -20,7 +20,8 @@ ATTRIBUTE RFC-SPACE-TYPE-CONCAT 17 concat
|
||||
ATTRIBUTE RFC-SPACE-TYPE-TLV 18 tlv
|
||||
ATTRIBUTE RFC-SPACE-TYPE-EXTENDED 19 extended
|
||||
ATTRIBUTE RFC-SPACE-TYPE-LONG-EXTENDED 20 long-extended
|
||||
ATTRIBUTE RFC-SPACE-TYPE-EVS 21 evs
|
||||
ATTRIBUTE RFC-SPACE-TYPE-EVS 19.21 evs
|
||||
ATTRIBUTE RFC-SPACE-TYPE-LONG-EVS 20.21 evs
|
||||
|
||||
ATTRIBUTE RFC-SPACE-TAGGED-STRING 101 string has_tag
|
||||
ATTRIBUTE RFC-SPACE-TAGGED-OCTETS 102 octets has_tag
|
||||
@@ -39,6 +40,44 @@ ATTRIBUTE RFC-SPACE-TAGGED-COMBOIP 114 comboip has_tag
|
||||
ATTRIBUTE RFC-SPACE-TAGGED-IFID 115 ifid has_tag
|
||||
ATTRIBUTE RFC-SPACE-TAGGED-ETHER 116 ether has_tag
|
||||
|
||||
# TODO How are EVS meant to be specified?
|
||||
# ATTRIBUTE VENDOR10-EVS-TYPE-STRING 19.21.1234.1 string
|
||||
# ATTRIBUTE VENDOR10-EVS-TYPE-OCTETS 19.21.1234.2 octets
|
||||
# ATTRIBUTE VENDOR10-EVS-TYPE-DATE 19.21.1234.3 date
|
||||
# ATTRIBUTE VENDOR10-EVS-TYPE-ABINARY 19.21.1234.4 abinary
|
||||
# ATTRIBUTE VENDOR10-EVS-TYPE-BYTE 19.21.1234.5 byte
|
||||
# ATTRIBUTE VENDOR10-EVS-TYPE-SHORT 19.21.1234.6 short
|
||||
# ATTRIBUTE VENDOR10-EVS-TYPE-INTEGER 19.21.1234.7 integer
|
||||
# ATTRIBUTE VENDOR10-EVS-TYPE-SIGNED 19.21.1234.8 signed
|
||||
# ATTRIBUTE VENDOR10-EVS-TYPE-INTEGER64 19.21.1234.9 integer64
|
||||
# ATTRIBUTE VENDOR10-EVS-TYPE-IPADDR 19.21.1234.10 ipaddr
|
||||
# ATTRIBUTE VENDOR10-EVS-TYPE-IPV4PREFIX 19.21.1234.11 ipv4prefix
|
||||
# ATTRIBUTE VENDOR10-EVS-TYPE-IPV6ADDR 19.21.1234.12 ipv6addr
|
||||
# ATTRIBUTE VENDOR10-EVS-TYPE-IPV6PREFIX 19.21.1234.13 ipv6prefix
|
||||
# ATTRIBUTE VENDOR10-EVS-TYPE-COMBOIP 19.21.1234.14 comboip
|
||||
# ATTRIBUTE VENDOR10-EVS-TYPE-IFID 19.21.1234.15 ifid
|
||||
# ATTRIBUTE VENDOR10-EVS-TYPE-ETHER 19.21.1234.16 ether
|
||||
# ATTRIBUTE VENDOR10-EVS-TYPE-TLV 19.21.1234.18 tlv
|
||||
#
|
||||
# ATTRIBUTE VENDOR10-LONG-EVS-TYPE-STRING 20.21.1234.1 string
|
||||
# ATTRIBUTE VENDOR10-LONG-EVS-TYPE-OCTETS 20.21.1234.2 octets
|
||||
# ATTRIBUTE VENDOR10-LONG-EVS-TYPE-DATE 20.21.1234.3 date
|
||||
# ATTRIBUTE VENDOR10-LONG-EVS-TYPE-ABINARY 20.21.1234.4 abinary
|
||||
# ATTRIBUTE VENDOR10-LONG-EVS-TYPE-BYTE 20.21.1234.5 byte
|
||||
# ATTRIBUTE VENDOR10-LONG-EVS-TYPE-SHORT 20.21.1234.6 short
|
||||
# ATTRIBUTE VENDOR10-LONG-EVS-TYPE-INTEGER 20.21.1234.7 integer
|
||||
# ATTRIBUTE VENDOR10-LONG-EVS-TYPE-SIGNED 20.21.1234.8 signed
|
||||
# ATTRIBUTE VENDOR10-LONG-EVS-TYPE-INTEGER64 20.21.1234.9 integer64
|
||||
# ATTRIBUTE VENDOR10-LONG-EVS-TYPE-IPADDR 20.21.1234.10 ipaddr
|
||||
# ATTRIBUTE VENDOR10-LONG-EVS-TYPE-IPV4PREFIX 20.21.1234.11 ipv4prefix
|
||||
# ATTRIBUTE VENDOR10-LONG-EVS-TYPE-IPV6ADDR 20.21.1234.12 ipv6addr
|
||||
# ATTRIBUTE VENDOR10-LONG-EVS-TYPE-IPV6PREFIX 20.21.1234.13 ipv6prefix
|
||||
# ATTRIBUTE VENDOR10-LONG-EVS-TYPE-COMBOIP 20.21.1234.14 comboip
|
||||
# ATTRIBUTE VENDOR10-LONG-EVS-TYPE-IFID 20.21.1234.15 ifid
|
||||
# ATTRIBUTE VENDOR10-LONG-EVS-TYPE-ETHER 20.21.1234.16 ether
|
||||
# ATTRIBUTE VENDOR10-LONG-EVS-TYPE-TLV 20.21.1234.18 tlv
|
||||
|
||||
|
||||
VENDOR TEST10 1234 format=1,0
|
||||
|
||||
BEGIN-VENDOR TEST10
|
||||
|
||||
@@ -119,8 +119,7 @@ def test_valid_attribute_numbers(number):
|
||||
|
||||
|
||||
@pytest.mark.parametrize(
|
||||
"invalid_number",
|
||||
["1000", "ABCD", "-1", "inf", "INF", "-INF", "2e4", "2.5e3"],
|
||||
"invalid_number", ["1000", "ABCD", "-1", "inf", "INF", "-INF", "2e4", "2.5e3"],
|
||||
)
|
||||
def test_invalid_attribute_numbers(invalid_number):
|
||||
dictionary = StringIO(f"ATTRIBUTE NAME {invalid_number} integer64")
|
||||
@@ -300,8 +299,7 @@ def test_invalid_datatypes_in_vendor_space(datatype):
|
||||
|
||||
|
||||
@pytest.mark.parametrize(
|
||||
"invalid_number",
|
||||
["ABCD", "-1", "inf", "INF", "-INF", "0.1", "2e4", "2.5e3"],
|
||||
"invalid_number", ["ABCD", "-1", "inf", "INF", "-INF", "0.1", "2e4", "2.5e3"],
|
||||
)
|
||||
def test_invalid_value_numbers(invalid_number):
|
||||
dictionary = StringIO(
|
||||
|
||||
@@ -148,10 +148,34 @@ def flattened_product(l1, l2):
|
||||
return result
|
||||
|
||||
|
||||
def transform_attr_to_extended_evs(attributes):
|
||||
vendor_id = (1234).to_bytes(4, "big")
|
||||
new_attrs = []
|
||||
for attr, result in attributes:
|
||||
length = (attr[1] + 6).to_bytes(1, "big")
|
||||
orig_type = attr[0].to_bytes(1, "big")
|
||||
attr = b"\x13" + length + b"\x15" + vendor_id + orig_type + attr[2:]
|
||||
new_attrs.append((attr, result))
|
||||
return new_attrs
|
||||
|
||||
|
||||
def transform_attr_to_longextended_evs(attributes):
|
||||
vendor_id = (1234).to_bytes(4, "big")
|
||||
new_attrs = []
|
||||
for attr, result in attributes:
|
||||
length = (attr[1] + 7).to_bytes(1, "big")
|
||||
orig_type = attr[0].to_bytes(1, "big")
|
||||
attr = b"\x14" + length + b"\x15" + b"\x00" + vendor_id + orig_type + attr[2:]
|
||||
new_attrs.append((attr, result))
|
||||
return new_attrs
|
||||
|
||||
|
||||
VENDOR_TEST_ATTRIBUTES = flattened_product(VENDOR_FORMAT_COMBINATIONS, TEST_ATTRIBUTES)
|
||||
VENDOR_TAGGED_ATTRIBUTES = flattened_product(
|
||||
VENDOR_FORMAT_COMBINATIONS, TAGGED_ATTRIBUTES
|
||||
)
|
||||
VENDOR_EXTENDED_EVS_ATTRIBUTES = transform_attr_to_extended_evs(TEST_ATTRIBUTES)
|
||||
VENDOR_LONGEXTENDED_EVS_ATTRIBUTES = transform_attr_to_longextended_evs(TEST_ATTRIBUTES)
|
||||
|
||||
|
||||
@pytest.fixture
|
||||
@@ -191,6 +215,24 @@ def test_decode_attribute_rfc_tagged(radius_dictionary, attr_bytes, expected):
|
||||
assert attrs[0].tag == 1
|
||||
|
||||
|
||||
@pytest.mark.parametrize("attr_bytes, expected", VENDOR_EXTENDED_EVS_ATTRIBUTES)
|
||||
def test_decode_extended_evs(radius_dictionary, attr_bytes, expected):
|
||||
raw_packet = bytes(20) + attr_bytes
|
||||
attrs = utils.decode_attributes(radius_dictionary, raw_packet)
|
||||
assert len(attrs) == 1
|
||||
assert attrs[0].value == expected
|
||||
assert attrs[0].tag == 0
|
||||
|
||||
|
||||
@pytest.mark.parametrize("attr_bytes, expected", VENDOR_LONGEXTENDED_EVS_ATTRIBUTES)
|
||||
def test_decode_longextended_evs(radius_dictionary, attr_bytes, expected):
|
||||
raw_packet = bytes(20) + attr_bytes
|
||||
attrs = utils.decode_attributes(radius_dictionary, raw_packet)
|
||||
assert len(attrs) == 1
|
||||
assert attrs[0].value == expected
|
||||
assert attrs[0].tag == 0
|
||||
|
||||
|
||||
def generate_vendor_attribute(vendor_id, tlen, llen, attr_bytes):
|
||||
vendor_id = vendor_id.to_bytes(4, "big")
|
||||
vsa_length = (4 + tlen + llen + len(attr_bytes)).to_bytes(1, "big")
|
||||
|
||||
Reference in New Issue
Block a user