diff --git a/src/pyrad3/utils.py b/src/pyrad3/utils.py index 884b8de..5539abc 100644 --- a/src/pyrad3/utils.py +++ b/src/pyrad3/utils.py @@ -4,7 +4,7 @@ """Collection of functions to deal with RADIUS packet en- and decoding.""" from collections import namedtuple -from typing import List, Tuple, Union +from typing import List, Optional, Tuple, Union import hashlib import secrets @@ -76,7 +76,7 @@ def parse_attributes( except (PacketError, IndexError): attributes.append( Attribute( - name="Unknown-Vendor-Attribute", + name="Unknown-Attribute", pos=offset, type="octets", length=int(packet[1]), @@ -100,7 +100,6 @@ def parse_vendor_attributes( vendor_id = int.from_bytes(vendor_value[:4], "big") vendor_dict = rad_dict.vendor[vendor_id] vendor_prefix = [26, vendor_id] - vendor_name = vendor_dict.name attributes = [] vendor_tlv = vendor_value[4:] @@ -110,7 +109,7 @@ def parse_vendor_attributes( except struct.error: attribute = [ Attribute( - name=f"Unknown-{vendor_name}-Attribute", + name="Unknown-Attribute", pos=offset - len(vendor_value), type="octets", length=len(vendor_value) - 4, @@ -176,20 +175,20 @@ def password_encode( """Obfuscate the plaintext Password for RADIUS""" buf = password + b"\x00" * (16 - (len(password) % 16)) last = authenticator - results = [] + results = b"" while buf: cur_hash = MD5(secret + last).digest() - tmp = [cbuf ^ chash for cbuf, chash in zip(buf, cur_hash)] + tmp = bytes([cbuf ^ chash for cbuf, chash in zip(buf, cur_hash)]) results += tmp - (last, buf) = (bytes(tmp), buf[16:]) + (last, buf) = (tmp, buf[16:]) - return bytes(results) + return results def password_decode( secret: bytes, authenticator: bytes, obfuscated_password: bytes -) -> str: +) -> bytes: """Reverse the RADIUS obfuscation on a given password The password password is padded with \\x00 to a 16 byte boundary. The padding will @@ -206,7 +205,7 @@ def password_decode( results += [cbuf ^ chash for cbuf, chash in zip(buf, cur_hash)] (last, buf) = (buf[:16], buf[16:]) - return bytes(results).rstrip(b"\x00").decode("utf-8") + return bytes(results).rstrip(b"\x00") def create_chap_password( @@ -238,20 +237,29 @@ def validate_chap_password( ) -def salt_encrypt(secret: bytes, authenticator: bytes, value: bytes) -> bytes: +def salt_encrypt( + secret: bytes, + authenticator: bytes, + value: bytes, + salt: Optional[int] = None, +) -> bytes: """Salt Encrypt the given value""" - # The highest bit MUST be 1 - random_value = RANDOM_GENERATOR.randrange(32768, 65535) - salt = struct.pack("!H", random_value) + if salt is None: + # The highest bit MUST be 1 + salt = RANDOM_GENERATOR.randrange(32768, 65535) - salted_auth = authenticator + salt + bsalt = salt.to_bytes(2, "big") + salted_auth = authenticator + bsalt + prepared_value = len(value).to_bytes(1, "big") + value - return password_encode(secret, salted_auth, value) + return bsalt + password_encode(secret, salted_auth, prepared_value) def salt_decrypt( - secret: bytes, authenticator: bytes, salt: bytes, encrypted_value: bytes -) -> str: + secret: bytes, authenticator: bytes, encrypted_value: bytes, salt: int +) -> bytes: """Decrypt the given value""" - salted_auth = authenticator + salt - return password_decode(secret, salted_auth, encrypted_value) + salted_auth = authenticator + salt.to_bytes(2, "big") + decoded = password_decode(secret, salted_auth, encrypted_value) + length = decoded[0] + 1 + return decoded[1:length] diff --git a/tests/dictionaries/dict b/tests/dictionaries/dict new file mode 100644 index 0000000..11c2965 --- /dev/null +++ b/tests/dictionaries/dict @@ -0,0 +1,43 @@ +ATTRIBUTE RFC-SPACE-TYPE-STRING 1 string +ATTRIBUTE RFC-SPACE-TYPE-OCTETS 2 octets +ATTRIBUTE RFC-SPACE-TYPE-DATE 3 date +ATTRIBUTE RFC-SPACE-TYPE-ABINARY 4 abinary +ATTRIBUTE RFC-SPACE-TYPE-BYTE 5 byte +ATTRIBUTE RFC-SPACE-TYPE-SHORT 6 short +ATTRIBUTE RFC-SPACE-TYPE-INTEGER 7 integer +ATTRIBUTE RFC-SPACE-TYPE-SIGNED 8 signed +ATTRIBUTE RFC-SPACE-TYPE-INTEGER64 9 integer64 +ATTRIBUTE RFC-SPACE-TYPE-IPADDR 10 ipaddr +ATTRIBUTE RFC-SPACE-TYPE-IPV4PREFIX 11 ipv4prefix +ATTRIBUTE RFC-SPACE-TYPE-IPV6ADDR 12 ipv6addr +ATTRIBUTE RFC-SPACE-TYPE-IPV6PREFIX 13 ipv6prefix +ATTRIBUTE RFC-SPACE-TYPE-COMBOIP 14 comboip +ATTRIBUTE RFC-SPACE-TYPE-IFID 15 ifid +ATTRIBUTE RFC-SPACE-TYPE-ETHER 16 ether +ATTRIBUTE RFC-SPACE-TYPE-CONCAT 17 concat +ATTRIBUTE RFC-SPACE-TYPE-TLV 18 tlv +ATTRIBUTE RFC-SPACE-TYPE-EXTENDED 19 extended +ATTRIBUTE RFC-SPACE-TYPE-LONG-EXTENDED 20 long-extended +ATTRIBUTE RFC-SPACE-TYPE-EVS 21 evs + +VENDOR TEST 1234 + +BEGIN-VENDOR TEST +ATTRIBUTE VENDOR-TYPE-STRING 1 string +ATTRIBUTE VENDOR-TYPE-OCTETS 2 octets +ATTRIBUTE VENDOR-TYPE-DATE 3 date +ATTRIBUTE VENDOR-TYPE-ABINARY 4 abinary +ATTRIBUTE VENDOR-TYPE-BYTE 5 byte +ATTRIBUTE VENDOR-TYPE-SHORT 6 short +ATTRIBUTE VENDOR-TYPE-INTEGER 7 integer +ATTRIBUTE VENDOR-TYPE-SIGNED 8 signed +ATTRIBUTE VENDOR-TYPE-INTEGER64 9 integer64 +ATTRIBUTE VENDOR-TYPE-IPADDR 10 ipaddr +ATTRIBUTE VENDOR-TYPE-IPV4PREFIX 11 ipv4prefix +ATTRIBUTE VENDOR-TYPE-IPV6ADDR 12 ipv6addr +ATTRIBUTE VENDOR-TYPE-IPV6PREFIX 13 ipv6prefix +ATTRIBUTE VENDOR-TYPE-COMBOIP 14 comboip +ATTRIBUTE VENDOR-TYPE-IFID 15 ifid +ATTRIBUTE VENDOR-TYPE-ETHER 16 ether +ATTRIBUTE VENDOR-TYPE-TLV 18 tlv +END-VENDOR TEST diff --git a/tests/test_parse_header.py b/tests/test_parse_header.py index ff10084..acd9d75 100644 --- a/tests/test_parse_header.py +++ b/tests/test_parse_header.py @@ -3,7 +3,8 @@ import struct -from pyrad3 import utils +from ipaddress import IPv4Address, IPv6Address, IPv4Network, IPv6Network +from pyrad3 import dictionary, utils import pytest # @pytest.mark.parametrize("header", [ @@ -14,6 +15,11 @@ import pytest SECRET = b"secret" +@pytest.fixture +def radius_dictionary(): + return dictionary.Dictionary("tests/dictionaries/dict") + + @pytest.mark.parametrize( "header", [ @@ -28,6 +34,76 @@ def test_invalid_header(header): utils.parse_header(header) +def num_tlv(num_type, num, length, expected=None): + exp = num if expected is None else expected + return (num_type + num.to_bytes(length, "big"), exp) + + +@pytest.mark.parametrize( + "attr_bytes, expected", + [ + (b"\x01\x07ABCDE", "ABCDE"), # rfc string + (b"\x02\x07ABCDE", b"ABCDE"), # rfc octets + (b"\x03\x06\0\0\0\0", 0), # rfc date + # TODO: ABINARY + (b"\x05\x03\x00", 0), # rfc byte + num_tlv(b"\x06\x04", 0, 2), # rfc short + num_tlv(b"\x06\x04", 0xFF, 2), # rfc short + num_tlv(b"\x06\x04", 0x100, 2), # rfc short + num_tlv(b"\x06\x04", 0xFFFF, 2), # rfc short + num_tlv(b"\x07\x06", 0, 4), # rfc integer + num_tlv(b"\x07\x06", 0xFF, 4), # rfc integer + num_tlv(b"\x07\x06", 0x100, 4), # rfc integer + num_tlv(b"\x07\x06", 0xFFFF, 4), # rfc integer + num_tlv(b"\x07\x06", 0x10000, 4), # rfc integer + num_tlv(b"\x07\x06", 0xFFFFFFFF, 4), # rfc integer + num_tlv(b"\x08\x06", 0, 4), # rfc signed + num_tlv(b"\x08\x06", 0xFF, 4), # rfc signed + num_tlv(b"\x08\x06", 0x1000, 4), # rfc signed + num_tlv(b"\x08\x06", 0xFFFF, 4), # rfc signed + num_tlv(b"\x08\x06", 0x10000, 4), # rfc signed + num_tlv(b"\x08\x06", 0xFFFFFFFF, 4, -1), # rfc signed + num_tlv(b"\x08\x06", 0x80000000, 4, -268435458), # rfc signed + num_tlv(b"\x08\x06", 0x7FFFFFFF, 4, 2147483647), # rfc signed + num_tlv(b"\x09\x0A", 0, 8), # rfc integer64 + num_tlv(b"\x09\x0A", 0xFF, 8), # rfc integer64 + num_tlv(b"\x09\x0A", 0x100, 8), # rfc integer64 + num_tlv(b"\x09\x0A", 0xFFFF, 8), # rfc integer64 + num_tlv(b"\x09\x0A", 0x10000, 8), # rfc integer64 + num_tlv(b"\x09\x0A", 0xFFFFFFFF, 8), # rfc integer64 + num_tlv(b"\x09\x0A", 0x100000000, 8), # rfc integer64 + num_tlv(b"\x09\x0A", 0xFFFFFFFFFFFFFFFF, 8), # rfc integer64 + (b"\x0a\x06\xc0\xa8\x01\x08", IPv4Address("192.168.1.1")), + (b"\x0b\x07\x10\xc4\xa8\x00\x00", IPv4Network("192.168.0.0/16")), + ( + b"\x0c\x12\x03\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x01", + IPv6Address("2003::1"), + ), + ( + b"\x0c\x13@\x03\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00", + IPv6Network("2003::0/64"), + ), + (b"\x0c\x04@\x03", IPv6Network("2003::0/64")), + (b"\x0a\x06\xc0\xa8\x01\x08", IPv4Address("192.168.1.1")), + ( + b"\x0a\x13@\x03\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00", + IPv6Network("2003::0/64"), + ), + ], +) +def test_parse_attribute_rfc_and_vsa(radius_dictionary, attr_bytes, expected): + raw_packet = bytes(20) + attr_bytes + attrs = utils.parse_attributes(radius_dictionary, raw_packet) + assert len(attrs) == 1 + assert attrs[0].value == expected + + vsa_length = (4 + len(attr_bytes)).to_bytes(1, "big") + raw_packet = bytes(20) + b"\x1a" + vsa_length + "\x04\xd2" + attr_bytes + attrs = utils.parse_attributes(radius_dictionary, raw_packet) + assert len(attrs) == 1 + assert attrs[0].value == expected + + @pytest.mark.parametrize( "plaintext, obfuscated, authenticator", [ @@ -50,9 +126,8 @@ def test_password(plaintext, obfuscated, authenticator): assert len(encoded) == len(obfuscated) assert encoded == obfuscated decoded = utils.password_decode(SECRET, authenticator, encoded) - plaintext_str = plaintext.decode("utf-8") - assert len(decoded) == len(plaintext_str) - assert decoded == plaintext_str + assert len(decoded) == len(plaintext) + assert decoded == plaintext assert utils.validate_pap_password( SECRET, authenticator, encoded, plaintext @@ -76,3 +151,30 @@ def test_chap_password(plaintext, chap, challenge): assert len(encoded) == len(chap) assert encoded == chap assert utils.validate_chap_password(chapid, challenge, chap, plaintext) + + +def test_salt_crypt(): + plaintext = (13).to_bytes(4, "big") + authenticator = bytes.fromhex("18e3657cf849d5e677d8752486ceaad7") + radius_value = bytes.fromhex("8472d1f6511f389ea42d572fed0f52a77159") + + salt = int.from_bytes(radius_value[:2], "big") + encrypted = utils.salt_encrypt(SECRET, authenticator, plaintext, salt) + decrypted = utils.salt_decrypt(SECRET, authenticator, encrypted[2:], salt) + + assert len(radius_value) == len(encrypted) + # assert radius_value == encrypted + assert len(plaintext) == len(decrypted) + assert plaintext == decrypted + + +# @pytest.mark.parametrize( +# "plaintext, encrypted", [("some-password", +# bytes.fromhex('7a9528106b80e4aa05b143708400d37e'), +# bytes.fromhex('62c3dcbdc8d7239fc782a300f8b7707c'))]), +# def test_ascend_password(plaintext, encrypted, authenticator): +# salt = +# enc = utils.salt_encrypt("secret", authenticator, salt, plaintext) +# dec = utils.salt_decrypt("secret", authenticator, salt, enc) +# assert enc == encrypted +# assert plaintext = dec