diff --git a/src/pyrad3/tools.py b/src/pyrad3/tools.py index 82e993f..678b40d 100644 --- a/src/pyrad3/tools.py +++ b/src/pyrad3/tools.py @@ -215,9 +215,9 @@ def decode_ascend_binary(string: bytes): raise NotImplementedError -def decode_integer(num: bytes, struct_format="!I") -> int: +def decode_integer(num: bytes, signed=False) -> int: """Decode a RADIUS value of some integer type""" - return (struct.unpack(struct_format, num))[0] + return int.from_bytes(num, byteorder="big", signed=signed) def decode_date(num: bytes) -> int: # TODO: type @@ -264,11 +264,11 @@ DECODE_MAP: Dict[Datatype, Callable[[bytes], Any]] = { # TODO: length check (8) Datatype.ifid: decode_octets, Datatype.abinary: decode_ascend_binary, - Datatype.byte: lambda value: decode_integer(value, "!B"), - Datatype.short: lambda value: decode_integer(value, "!H"), - Datatype.signed: lambda value: decode_integer(value, "!i"), + Datatype.byte: decode_integer, + Datatype.short: decode_integer, + Datatype.signed: lambda num: decode_integer(num, True), Datatype.integer: decode_integer, - Datatype.integer64: lambda value: decode_integer(value, "!Q"), + Datatype.integer64: decode_integer, Datatype.date: decode_date, } diff --git a/src/pyrad3/utils.py b/src/pyrad3/utils.py index caea388..53d7af4 100644 --- a/src/pyrad3/utils.py +++ b/src/pyrad3/utils.py @@ -22,7 +22,9 @@ class PacketError(Exception): Header = namedtuple("Header", ["code", "radius_id", "length", "authenticator"]) -Attribute = namedtuple("Attribute", ["name", "pos", "type", "length", "value"]) +Attribute = namedtuple( + "Attribute", ["name", "pos", "type", "length", "tag", "value"] +) PreParsedAttributes = List[Tuple[Tuple[int, ...], bytes, int]] SpecialTlvDescription = Tuple[Tuple[int, ...], bytes, int] @@ -65,12 +67,17 @@ def parse_attributes( attr_def = rad_dict.attrindex.get(key) length = len(value) dec_value: Any = value # to silence mypy + tag = 0 if attr_def is None: name = "Unknown-Attribute" datatype = Datatype.octets else: name = attr_def.name datatype = attr_def.datatype + print(attr_def) + if attr_def.has_tag: + tag = value[0] + value = value[1:] dec_value = decode_attr(datatype, value) try: dec_value = attr_def.values[dec_value] @@ -82,6 +89,7 @@ def parse_attributes( pos=offset, length=length, type=datatype, + tag=tag, value=dec_value, ) ) @@ -133,6 +141,7 @@ def pre_parse_attributes( # pylint: disable=too-many-branches for key, value, offset in tmp_attributes: adef = rad_dict.attrindex.get(key) if adef is not None and adef.datatype == Datatype.tlv: + # TODO: deal with tagged tlvs attributes.extend( decode_tlv(rad_dict, list(key), value, offset) ) diff --git a/tests/dictionaries/dict b/tests/dictionaries/dict index 3bfe61d..22e66ad 100644 --- a/tests/dictionaries/dict +++ b/tests/dictionaries/dict @@ -1,3 +1,5 @@ +ATTRIBUTE VENDOR-SPECIFIC 26 vsa + ATTRIBUTE RFC-SPACE-TYPE-STRING 1 string ATTRIBUTE RFC-SPACE-TYPE-OCTETS 2 octets ATTRIBUTE RFC-SPACE-TYPE-DATE 3 date @@ -20,7 +22,22 @@ ATTRIBUTE RFC-SPACE-TYPE-EXTENDED 19 extended ATTRIBUTE RFC-SPACE-TYPE-LONG-EXTENDED 20 long-extended ATTRIBUTE RFC-SPACE-TYPE-EVS 21 evs -ATTRIBUTE VENDOR-SPECIFIC 26 vsa +ATTRIBUTE RFC-SPACE-TAGGED-STRING 101 string has_tag +ATTRIBUTE RFC-SPACE-TAGGED-OCTETS 102 octets has_tag +ATTRIBUTE RFC-SPACE-TAGGED-DATE 103 date has_tag +ATTRIBUTE RFC-SPACE-TAGGED-ABINARY 104 abinary has_tag +ATTRIBUTE RFC-SPACE-TAGGED-BYTE 105 byte has_tag +ATTRIBUTE RFC-SPACE-TAGGED-SHORT 106 short has_tag +ATTRIBUTE RFC-SPACE-TAGGED-INTEGER 107 integer has_tag +ATTRIBUTE RFC-SPACE-TAGGED-SIGNED 108 signed has_tag +ATTRIBUTE RFC-SPACE-TAGGED-INTEGER64 109 integer64 has_tag +ATTRIBUTE RFC-SPACE-TAGGED-IPADDR 110 ipaddr has_tag +ATTRIBUTE RFC-SPACE-TAGGED-IPV4PREFIX 111 ipv4prefix has_tag +ATTRIBUTE RFC-SPACE-TAGGED-IPV6ADDR 112 ipv6addr has_tag +ATTRIBUTE RFC-SPACE-TAGGED-IPV6PREFIX 113 ipv6prefix has_tag +ATTRIBUTE RFC-SPACE-TAGGED-COMBOIP 114 comboip has_tag +ATTRIBUTE RFC-SPACE-TAGGED-IFID 115 ifid has_tag +ATTRIBUTE RFC-SPACE-TAGGED-ETHER 116 ether has_tag VENDOR TEST 1234 @@ -42,5 +59,24 @@ ATTRIBUTE VENDOR-TYPE-COMBOIP 14 comboip ATTRIBUTE VENDOR-TYPE-IFID 15 ifid ATTRIBUTE VENDOR-TYPE-ETHER 16 ether ATTRIBUTE VENDOR-TYPE-TLV 18 tlv + +ATTRIBUTE VENDOR-TAGGED-STRING 101 string has_tag +ATTRIBUTE VENDOR-TAGGED-OCTETS 102 octets has_tag +ATTRIBUTE VENDOR-TAGGED-DATE 103 date has_tag +ATTRIBUTE VENDOR-TAGGED-ABINARY 104 abinary has_tag +ATTRIBUTE VENDOR-TAGGED-BYTE 105 byte has_tag +ATTRIBUTE VENDOR-TAGGED-SHORT 106 short has_tag +ATTRIBUTE VENDOR-TAGGED-INTEGER 107 integer has_tag +ATTRIBUTE VENDOR-TAGGED-SIGNED 108 signed has_tag +ATTRIBUTE VENDOR-TAGGED-INTEGER64 109 integer64 has_tag +ATTRIBUTE VENDOR-TAGGED-IPADDR 110 ipaddr has_tag +ATTRIBUTE VENDOR-TAGGED-IPV4PREFIX 111 ipv4prefix has_tag +ATTRIBUTE VENDOR-TAGGED-IPV6ADDR 112 ipv6addr has_tag +ATTRIBUTE VENDOR-TAGGED-IPV6PREFIX 113 ipv6prefix has_tag +ATTRIBUTE VENDOR-TAGGED-COMBOIP 114 comboip has_tag +ATTRIBUTE VENDOR-TAGGED-IFID 115 ifid has_tag +ATTRIBUTE VENDOR-TAGGED-ETHER 116 ether has_tag + + END-VENDOR TEST diff --git a/tests/test_dictionary.py b/tests/test_dictionary.py index ed5b9d1..98c4ec3 100644 --- a/tests/test_dictionary.py +++ b/tests/test_dictionary.py @@ -6,6 +6,7 @@ from io import StringIO import pytest from pyrad3.dictionary import Dictionary, ParseError +from pyrad3.types import Encrypt @pytest.mark.parametrize( @@ -358,7 +359,8 @@ def test_unimplemented_tlvs(): @pytest.mark.parametrize("flag", [1, 2, 3]) def test_valid_attribute_encrpytion_flags(flag): dictionary = StringIO(f"ATTRIBUTE NAME 123 octets encrypt={flag}") - Dictionary("", dictionary) + rad_dict = Dictionary("", dictionary) + assert rad_dict.attrindex[123].encrypt == Encrypt(flag) @pytest.mark.parametrize("flag", ["0.1", "0", "4", "0x1", "0o2", "user", ""]) @@ -370,7 +372,8 @@ def test_invalid_attribute_encrpytion_flags(flag): def test_has_tag_flag(): dictionary = StringIO("ATTRIBUTE NAME 123 octets has_tag") - Dictionary("", dictionary) + rad_dict = Dictionary("", dictionary) + assert rad_dict.attrindex[123].has_tag @pytest.mark.parametrize("invalid_flag", ["blablub", "encrypt=2=2", "concat"]) diff --git a/tests/test_parse_header.py b/tests/test_parse_header.py index 5a5bdf7..867b868 100644 --- a/tests/test_parse_header.py +++ b/tests/test_parse_header.py @@ -16,6 +16,105 @@ from pyrad3 import dictionary, utils SECRET = b"secret" +def num_tlv(num_type, num, length, expected=None): + exp = num if expected is None else expected + return (num_type + num.to_bytes(length, "big"), exp) + + +TEST_ATTRIBUTES = [ + (b"\x01\x07ABCDE", "ABCDE"), # rfc string + (b"\x02\x07ABCDE", b"ABCDE"), # rfc octets + (b"\x03\x06\0\0\0\0", 0), # rfc date + # TODO: ABINARY + (b"\x05\x03\x00", 0), # rfc byte + num_tlv(b"\x06\x04", 0, 2), # rfc short + num_tlv(b"\x06\x04", 0xFF, 2), # rfc short + num_tlv(b"\x06\x04", 0x100, 2), # rfc short + num_tlv(b"\x06\x04", 0xFFFF, 2), # rfc short + num_tlv(b"\x07\x06", 0, 4), # rfc integer + num_tlv(b"\x07\x06", 0xFF, 4), # rfc integer + num_tlv(b"\x07\x06", 0x100, 4), # rfc integer + num_tlv(b"\x07\x06", 0xFFFF, 4), # rfc integer + num_tlv(b"\x07\x06", 0x10000, 4), # rfc integer + num_tlv(b"\x07\x06", 0xFFFFFFFF, 4), # rfc integer + num_tlv(b"\x08\x06", 0, 4), # rfc signed + num_tlv(b"\x08\x06", 0xFF, 4), # rfc signed + num_tlv(b"\x08\x06", 0x1000, 4), # rfc signed + num_tlv(b"\x08\x06", 0xFFFF, 4), # rfc signed + num_tlv(b"\x08\x06", 0x10000, 4), # rfc signed + num_tlv(b"\x08\x06", 0xFFFFFFFF, 4, -1), # rfc signed + num_tlv(b"\x08\x06", 0x80000000, 4, -2147483648), # rfc signed + num_tlv(b"\x08\x06", 0x7FFFFFFF, 4, 2147483647), # rfc signed + num_tlv(b"\x09\x0A", 0, 8), # rfc integer64 + num_tlv(b"\x09\x0A", 0xFF, 8), # rfc integer64 + num_tlv(b"\x09\x0A", 0x100, 8), # rfc integer64 + num_tlv(b"\x09\x0A", 0xFFFF, 8), # rfc integer64 + num_tlv(b"\x09\x0A", 0x10000, 8), # rfc integer64 + num_tlv(b"\x09\x0A", 0xFFFFFFFF, 8), # rfc integer64 + num_tlv(b"\x09\x0A", 0x100000000, 8), # rfc integer64 + num_tlv(b"\x09\x0A", 0xFFFFFFFFFFFFFFFF, 8), # rfc integer64 + (b"\x0a\x06\xc0\xa8\x01\x08", IPv4Address("192.168.1.8")), + (b"\x0b\x07\x10\xc0\xa8\x00\x00", IPv4Network("192.168.0.0/16")), + ( + b"\x0c\x12\x20\x03\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x01", + IPv6Address("2003::1"), + ), + ( + b"\x0d\x14\x00\x40\x20\x03\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00", + IPv6Network("2003::0/64"), + ), + (b"\x0c\x04\x20\x03", IPv6Address("2003::0")), + (b"\x0d\x06\x00\x40\x20\x03", IPv6Network("2003::0/64")), +] + +TAGGED_ATTRIBUTES = [ + (b"\x65\x08\x01ABCDE", "ABCDE"), # rfc string + (b"\x66\x08\x01ABCDE", b"ABCDE"), # rfc octets + (b"\x67\x07\x01\0\0\0\0", 0), # rfc date + # TODO: ABINARY + (b"\x69\x04\x01\x00", 0), # rfc byte + num_tlv(b"\x6a\x05\x01", 0, 2), # rfc short + num_tlv(b"\x6a\x05\x01", 0xFF, 2), # rfc short + num_tlv(b"\x6a\x05\x01", 0x100, 2), # rfc short + num_tlv(b"\x6a\x05\x01", 0xFFFF, 2), # rfc short + num_tlv(b"\x6b\x06\x01", 0, 3), # rfc integer + num_tlv(b"\x6b\x06\x01", 0xFF, 3), # rfc integer + num_tlv(b"\x6b\x06\x01", 0x100, 3), # rfc integer + num_tlv(b"\x6b\x06\x01", 0xFFFF, 3), # rfc integer + num_tlv(b"\x6b\x06\x01", 0x10000, 3), # rfc integer + # integer with tag is only 3 bytes + # num_tlv(b"\x70\x06\x01", 0xFFFFFFFF, 4), # rfc integer + num_tlv(b"\x6c\x07\x01", 0, 4), # rfc signed + num_tlv(b"\x6c\x07\x01", 0xFF, 4), # rfc signed + num_tlv(b"\x6c\x07\x01", 0x1000, 4), # rfc signed + num_tlv(b"\x6c\x07\x01", 0xFFFF, 4), # rfc signed + num_tlv(b"\x6c\x07\x01", 0x10000, 4), # rfc signed + num_tlv(b"\x6c\x07\x01", 0xFFFFFFFF, 4, -1), # rfc signed + num_tlv(b"\x6c\x07\x01", 0x80000000, 4, -2147483648), # rfc signed + num_tlv(b"\x6c\x07\x01", 0x7FFFFFFF, 4, 2147483647), # rfc signed + num_tlv(b"\x6d\x0B\x01", 0, 8), # rfc integer64 + num_tlv(b"\x6d\x0B\x01", 0xFF, 8), # rfc integer64 + num_tlv(b"\x6d\x0B\x01", 0x100, 8), # rfc integer64 + num_tlv(b"\x6d\x0B\x01", 0xFFFF, 8), # rfc integer64 + num_tlv(b"\x6d\x0B\x01", 0x10000, 8), # rfc integer64 + num_tlv(b"\x6d\x0B\x01", 0xFFFFFFFF, 8), # rfc integer64 + num_tlv(b"\x6d\x0B\x01", 0x100000000, 8), # rfc integer64 + num_tlv(b"\x6d\x0B\x01", 0xFFFFFFFFFFFFFFFF, 8), # rfc integer64 + (b"\x6e\x07\x01\xc0\xa8\x01\x08", IPv4Address("192.168.1.8")), + (b"\x6f\x08\x01\x10\xc0\xa8\x00\x00", IPv4Network("192.168.0.0/16")), + ( + b"\x70\x13\x01\x20\x03\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x01", + IPv6Address("2003::1"), + ), + ( + b"\x71\x15\x01\x00\x40\x20\x03\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00", + IPv6Network("2003::0/64"), + ), + (b"\x70\x05\x01\x20\x03", IPv6Address("2003::0")), + (b"\x71\x07\x01\x00\x40\x20\x03", IPv6Network("2003::0/64")), +] + + @pytest.fixture def radius_dictionary(): return dictionary.Dictionary("tests/dictionaries/dict") @@ -35,65 +134,17 @@ def test_invalid_header(header): utils.parse_header(header) -def num_tlv(num_type, num, length, expected=None): - exp = num if expected is None else expected - return (num_type + num.to_bytes(length, "big"), exp) - - -@pytest.mark.parametrize( - "attr_bytes, expected", - [ - (b"\x01\x07ABCDE", "ABCDE"), # rfc string - (b"\x02\x07ABCDE", b"ABCDE"), # rfc octets - (b"\x03\x06\0\0\0\0", 0), # rfc date - # TODO: ABINARY - (b"\x05\x03\x00", 0), # rfc byte - num_tlv(b"\x06\x04", 0, 2), # rfc short - num_tlv(b"\x06\x04", 0xFF, 2), # rfc short - num_tlv(b"\x06\x04", 0x100, 2), # rfc short - num_tlv(b"\x06\x04", 0xFFFF, 2), # rfc short - num_tlv(b"\x07\x06", 0, 4), # rfc integer - num_tlv(b"\x07\x06", 0xFF, 4), # rfc integer - num_tlv(b"\x07\x06", 0x100, 4), # rfc integer - num_tlv(b"\x07\x06", 0xFFFF, 4), # rfc integer - num_tlv(b"\x07\x06", 0x10000, 4), # rfc integer - num_tlv(b"\x07\x06", 0xFFFFFFFF, 4), # rfc integer - num_tlv(b"\x08\x06", 0, 4), # rfc signed - num_tlv(b"\x08\x06", 0xFF, 4), # rfc signed - num_tlv(b"\x08\x06", 0x1000, 4), # rfc signed - num_tlv(b"\x08\x06", 0xFFFF, 4), # rfc signed - num_tlv(b"\x08\x06", 0x10000, 4), # rfc signed - num_tlv(b"\x08\x06", 0xFFFFFFFF, 4, -1), # rfc signed - num_tlv(b"\x08\x06", 0x80000000, 4, -2147483648), # rfc signed - num_tlv(b"\x08\x06", 0x7FFFFFFF, 4, 2147483647), # rfc signed - num_tlv(b"\x09\x0A", 0, 8), # rfc integer64 - num_tlv(b"\x09\x0A", 0xFF, 8), # rfc integer64 - num_tlv(b"\x09\x0A", 0x100, 8), # rfc integer64 - num_tlv(b"\x09\x0A", 0xFFFF, 8), # rfc integer64 - num_tlv(b"\x09\x0A", 0x10000, 8), # rfc integer64 - num_tlv(b"\x09\x0A", 0xFFFFFFFF, 8), # rfc integer64 - num_tlv(b"\x09\x0A", 0x100000000, 8), # rfc integer64 - num_tlv(b"\x09\x0A", 0xFFFFFFFFFFFFFFFF, 8), # rfc integer64 - (b"\x0a\x06\xc0\xa8\x01\x08", IPv4Address("192.168.1.8")), - (b"\x0b\x07\x10\xc0\xa8\x00\x00", IPv4Network("192.168.0.0/16")), - ( - b"\x0c\x12\x20\x03\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x01", - IPv6Address("2003::1"), - ), - ( - b"\x0d\x14\x00\x40\x20\x03\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00", - IPv6Network("2003::0/64"), - ), - (b"\x0c\x04\x20\x03", IPv6Address("2003::0")), - (b"\x0d\x06\x00\x40\x20\x03", IPv6Network("2003::0/64")), - ], -) -def test_parse_attribute_rfc_and_vsa(radius_dictionary, attr_bytes, expected): +@pytest.mark.parametrize("attr_bytes, expected", TEST_ATTRIBUTES) +def test_parse_attribute_rfc(radius_dictionary, attr_bytes, expected): raw_packet = bytes(20) + attr_bytes attrs = utils.parse_attributes(radius_dictionary, raw_packet) assert len(attrs) == 1 assert attrs[0].value == expected + assert attrs[0].tag == 0 + +@pytest.mark.parametrize("attr_bytes, expected", TEST_ATTRIBUTES) +def test_parse_attribute_vsa(radius_dictionary, attr_bytes, expected): vsa_length = (6 + len(attr_bytes)).to_bytes(1, "big") raw_packet = ( bytes(20) + b"\x1a" + vsa_length + b"\x00\x00\x04\xd2" + attr_bytes @@ -101,6 +152,28 @@ def test_parse_attribute_rfc_and_vsa(radius_dictionary, attr_bytes, expected): attrs = utils.parse_attributes(radius_dictionary, raw_packet) assert len(attrs) == 1 assert attrs[0].value == expected + assert attrs[0].tag == 0 + + +@pytest.mark.parametrize("attr_bytes, expected", TAGGED_ATTRIBUTES) +def test_parse_attribute_rfc_tagged(radius_dictionary, attr_bytes, expected): + raw_packet = bytes(20) + attr_bytes + attrs = utils.parse_attributes(radius_dictionary, raw_packet) + assert len(attrs) == 1 + assert attrs[0].value == expected + assert attrs[0].tag == 1 + + +@pytest.mark.parametrize("attr_bytes, expected", TAGGED_ATTRIBUTES) +def test_parse_attribute_vsa_tagged(radius_dictionary, attr_bytes, expected): + vsa_length = (6 + len(attr_bytes)).to_bytes(1, "big") + raw_packet = ( + bytes(20) + b"\x1a" + vsa_length + b"\x00\x00\x04\xd2" + attr_bytes + ) + attrs = utils.parse_attributes(radius_dictionary, raw_packet) + assert len(attrs) == 1 + assert attrs[0].value == expected + assert attrs[0].tag == 1 @pytest.mark.parametrize(