add: initial evs support

This commit is contained in:
Istvan Ruzman
2022-02-21 16:36:52 +01:00
parent 10d7f0b7f4
commit d970df0dd7
4 changed files with 152 additions and 99 deletions

View File

@@ -7,6 +7,7 @@ Classes and Types to parse and represent a RADIUS dictionary.
""" """
import logging import logging
from contextlib import suppress
from os.path import dirname, isabs, join, normpath from os.path import dirname, isabs, join, normpath
from typing import IO, Dict, Generator, List, Optional, Sequence, Tuple, Union from typing import IO, Dict, Generator, List, Optional, Sequence, Tuple, Union
@@ -16,11 +17,11 @@ LOG = logging.getLogger(__name__)
INTEGER_TYPES: Dict[str, Tuple[int, int]] = { INTEGER_TYPES: Dict[str, Tuple[int, int]] = {
"byte": (0, 255), "BYTE": (0, 255),
"short": (0, 2 ** 16 - 1), "SHORT": (0, 2 ** 16 - 1),
"signed": (-(2 ** 31), 2 ** 31 - 1), "SIGNED": (-(2 ** 31), 2 ** 31 - 1),
"integer": (0, 2 ** 32 - 1), "INTEGER": (0, 2 ** 32 - 1),
"integer64": (0, 2 ** 64 - 1), "INTEGER64": (0, 2 ** 64 - 1),
} }
@@ -111,6 +112,7 @@ class Dictionary:
self.attrindex: Dict[Union[str, int, Tuple[int, ...]], Attribute] = {} self.attrindex: Dict[Union[str, int, Tuple[int, ...]], Attribute] = {}
self.rfc_vendor = Vendor("RFC", 0, 1, 1, False, {}) self.rfc_vendor = Vendor("RFC", 0, 1, 1, False, {})
self.cur_vendor = self.rfc_vendor self.cur_vendor = self.rfc_vendor
self.evs: Dict[str, List[int]] = {}
if __dictio is not None: if __dictio is not None:
loader = dict_parser(dictionary, __dictio) loader = dict_parser(dictionary, __dictio)
else: else:
@@ -120,11 +122,12 @@ class Dictionary:
def read_dictionary(self, reader: Generator[Tuple[int, List[str]], None, None]): def read_dictionary(self, reader: Generator[Tuple[int, List[str]], None, None]):
"""Read and parse a (Free)RADIUS dictionary.""" """Read and parse a (Free)RADIUS dictionary."""
self.filestack: List[str] = [] self.filestack: List[str] = []
cur_evs: List[int] = []
for line_num, tokens in reader: for line_num, tokens in reader:
key = tokens[0] key = tokens[0]
if key == "ATTRIBUTE": if key == "ATTRIBUTE":
# logging is done within the method # logging is done within the method
self._parse_attribute(tokens, line_num) self._parse_attribute(tokens, line_num, cur_evs)
elif key == "VALUE": elif key == "VALUE":
# logging is done within the method # logging is done within the method
self._parse_value(tokens, line_num) self._parse_value(tokens, line_num)
@@ -140,10 +143,11 @@ class Dictionary:
self._parse_vendor(tokens, line_num) self._parse_vendor(tokens, line_num)
LOG.info("Register vendor %s", tokens[1]) LOG.info("Register vendor %s", tokens[1])
elif key == "BEGIN-VENDOR": elif key == "BEGIN-VENDOR":
self._parse_begin_vendor(tokens, line_num) cur_evs = self._parse_begin_vendor(tokens, line_num)
LOG.info("Open Vendor section %s", tokens[1]) LOG.info("Open Vendor section %s", tokens[1])
elif key == "END-VENDOR": elif key == "END-VENDOR":
self._parse_end_vendor(tokens, line_num) self._parse_end_vendor(tokens, line_num)
cur_evs = []
LOG.info("Close Vendor section %s", tokens[1]) LOG.info("Close Vendor section %s", tokens[1])
elif key == "BEGIN-TLV": elif key == "BEGIN-TLV":
raise NotImplementedError( raise NotImplementedError(
@@ -164,16 +168,12 @@ class Dictionary:
vendor_name = tokens[1] vendor_name = tokens[1]
vendor_id = int(tokens[2], 0) vendor_id = int(tokens[2], 0)
continuation = False continuation = False
t_len, l_len = 1, 1
# Parse optional vendor specification # Parse optional vendor specification
try: with suppress(IndexError):
vendor_format = tokens[3].split("=") vendor_format = tokens[3].split("=")
if vendor_format[0] != "format": if vendor_format[0] == "format":
raise ParseError(
filename,
f"Unknown option {vendor_format[0]} for vendor definition",
line_num,
)
try: try:
vendor_format = vendor_format[1].split(",") vendor_format = vendor_format[1].split(",")
t_len, l_len = (int(a) for a in vendor_format[:2]) t_len, l_len = (int(a) for a in vendor_format[:2])
@@ -189,7 +189,7 @@ class Dictionary:
f'Invalid length definition "{l_len}" for vendor {vendor_name}', f'Invalid length definition "{l_len}" for vendor {vendor_name}',
line_num, line_num,
) )
try: with suppress(IndexError):
if vendor_format[2] == "c": if vendor_format[2] == "c":
if not vendor_name.upper() == "WIMAX": if not vendor_name.upper() == "WIMAX":
# Not sure why, but FreeRADIUS has this limit, # Not sure why, but FreeRADIUS has this limit,
@@ -200,37 +200,61 @@ class Dictionary:
line_num, line_num,
) )
continuation = True continuation = True
except IndexError: except IndexError as exc:
pass raise ParseError(
filename,
f"Invalid format definition for vendor {vendor_name}",
line_num,
) from exc
except ValueError as exc: except ValueError as exc:
raise ParseError( raise ParseError(
filename, filename,
f"Syntax error in specification for vendor {vendor_name}", f"Syntax error in specification for vendor {vendor_name}",
line_num, line_num,
) from exc ) from exc
except IndexError: else:
# no format definition raise ParseError(
t_len, l_len = 1, 1 filename,
f"Unknown option {vendor_format[0]} for vendor definition",
line_num,
)
vendor = Vendor(vendor_name, vendor_id, t_len, l_len, continuation, {}) vendor = Vendor(vendor_name, vendor_id, t_len, l_len, continuation, {})
self.vendor_lookup_id_by_name[vendor_name] = vendor_id self.vendor_lookup_id_by_name[vendor_name] = vendor_id
self.vendor[vendor_id] = vendor self.vendor[vendor_id] = vendor
def _parse_begin_vendor(self, tokens: Sequence[str], line_num: int): def _parse_begin_vendor(self, tokens: Sequence[str], line_num: int) -> List[int]:
"""Parse the BEGIN-VENDOR line of (Free)RADIUS dictionaries.""" """Parse the BEGIN-VENDOR line of (Free)RADIUS dictionaries."""
filename = self.filestack[-1] filename = self.filestack[-1]
evs = []
if self.cur_vendor != self.rfc_vendor: if self.cur_vendor != self.rfc_vendor:
raise ParseError( raise ParseError(
filename, filename,
"vendor-begin sections are not allowed to be nested", "vendor-begin sections are not allowed to be nested",
line_num, line_num,
) )
if len(tokens) != 2:
if len(tokens) not in [2, 3]:
raise ParseError( raise ParseError(
filename, filename,
"Incorrect number of tokens for begin-vendor statement", "Incorrect number of tokens for begin-vendor statement",
line_num, line_num,
) )
with suppress(IndexError):
vendor_format = tokens[2].split("=")
if vendor_format[0] != "format":
raise ParseError(
filename, f"Invalid token {tokens[3]} for vendor begin", line_num
)
if vendor_format[1].upper() in self.evs:
try:
evs = self.evs[vendor_format[1].upper()]
except KeyError as exc:
raise ParseError(
filename, f"Unknown EVS {vendor_format[1]}", line_num
) from exc
try: try:
vendor_id = self.vendor_lookup_id_by_name[tokens[1]] vendor_id = self.vendor_lookup_id_by_name[tokens[1]]
self.cur_vendor = self.vendor[vendor_id] self.cur_vendor = self.vendor[vendor_id]
@@ -240,6 +264,7 @@ class Dictionary:
f"Unknown vendor {tokens[1]} in begin-vendor statement", f"Unknown vendor {tokens[1]} in begin-vendor statement",
line_num, line_num,
) from exc ) from exc
return evs
def _parse_end_vendor(self, tokens: Sequence[str], line_num: int): def _parse_end_vendor(self, tokens: Sequence[str], line_num: int):
"""Parse the END-VENDOR line of (Free)RADIUS dictionaries.""" """Parse the END-VENDOR line of (Free)RADIUS dictionaries."""
@@ -299,10 +324,12 @@ class Dictionary:
return has_tag, encrypt return has_tag, encrypt
def _parse_attribute_code(self, attr_code: str, line_num: int) -> List[int]: def _parse_attribute_code(
self, attr_code: str, line_num: int, evs: List[int]
) -> List[int]:
filename = self.filestack[-1] filename = self.filestack[-1]
tlength = self.cur_vendor.tlength tlength = self.cur_vendor.tlength
codes = [] codes = evs.copy()
for code in attr_code.split("."): for code in attr_code.split("."):
try: try:
code_num = _parse_number(code) code_num = _parse_number(code)
@@ -325,7 +352,7 @@ class Dictionary:
codes.append(code_num) codes.append(code_num)
return codes return codes
def _parse_attribute(self, tokens: Sequence[str], line_num: int): def _parse_attribute(self, tokens: Sequence[str], line_num: int, evs: List[int]):
"""Parse an ATTRIBUTE line of (Free)RADIUS dictionaries.""" """Parse an ATTRIBUTE line of (Free)RADIUS dictionaries."""
filename = self.filestack[-1] filename = self.filestack[-1]
if not len(tokens) in [4, 5]: if not len(tokens) in [4, 5]:
@@ -348,7 +375,7 @@ class Dictionary:
line_num, line_num,
) )
codes = self._parse_attribute_code(attr_code, line_num) codes = self._parse_attribute_code(attr_code, line_num, evs)
# TODO: Do we some explicit handling of tlvs? # TODO: Do we some explicit handling of tlvs?
# if len(codes) > 1: # if len(codes) > 1:
@@ -358,13 +385,13 @@ class Dictionary:
base_datatype = datatype.split("[")[0].replace("-", "") base_datatype = datatype.split("[")[0].replace("-", "")
try: try:
attribute_type = Datatype[base_datatype] attribute_type = Datatype[base_datatype.upper()]
except KeyError as exc: except KeyError as exc:
raise ParseError(filename, f"Illegal type: {datatype}", line_num) from exc raise ParseError(filename, f"Illegal type: {datatype}", line_num) from exc
attribute = Attribute( attribute = Attribute(
name, name,
codes[-1], codes, # [-1],
attribute_type, attribute_type,
{}, {},
has_tag, has_tag,
@@ -377,6 +404,11 @@ class Dictionary:
) )
self.cur_vendor.attrs[attrcode] = attribute self.cur_vendor.attrs[attrcode] = attribute
if datatype == "evs":
if not isinstance(attrcode, tuple):
raise ParseError(filename, "evs must be a tlv", line_num)
self.evs[name.upper()] = list(attrcode)
if self.cur_vendor != self.rfc_vendor: if self.cur_vendor != self.rfc_vendor:
codes = [26, self.cur_vendor.code] + codes codes = [26, self.cur_vendor.code] + codes
LOG.info( LOG.info(
@@ -433,8 +465,7 @@ class Dictionary:
except KeyError as exc: except KeyError as exc:
raise ParseError( raise ParseError(
filename, filename,
f"only attributes with integer typed datatypes can have" f"only attributes with integer typed datatypes can have value definitions {attribute.datatype}",
f"value definitions {attribute.datatype}",
line_num, line_num,
) from exc ) from exc
attribute.values[value] = key attribute.values[value] = key

View File

@@ -10,7 +10,7 @@ we don't support them (yet).
from dataclasses import dataclass from dataclasses import dataclass
from enum import Enum, IntEnum, auto from enum import Enum, IntEnum, auto
from typing import Dict, Tuple, Union from typing import Dict, List, Tuple, Union
class Code(IntEnum): class Code(IntEnum):
@@ -78,7 +78,7 @@ class Attribute: # pylint: disable=too-many-instance-attributes
"""RADIUS Attribute definition""" """RADIUS Attribute definition"""
name: str name: str
code: int code: Union[int, List[int]]
datatype: Datatype datatype: Datatype
values: Dict[Union[int, str], Union[int, str]] values: Dict[Union[int, str], Union[int, str]]
has_tag: bool = False has_tag: bool = False

View File

@@ -40,44 +40,6 @@ ATTRIBUTE RFC-SPACE-TAGGED-COMBOIP 114 comboip has_tag
ATTRIBUTE RFC-SPACE-TAGGED-IFID 115 ifid has_tag ATTRIBUTE RFC-SPACE-TAGGED-IFID 115 ifid has_tag
ATTRIBUTE RFC-SPACE-TAGGED-ETHER 116 ether has_tag ATTRIBUTE RFC-SPACE-TAGGED-ETHER 116 ether has_tag
# TODO How are EVS meant to be specified?
# ATTRIBUTE VENDOR10-EVS-TYPE-STRING 19.21.1234.1 string
# ATTRIBUTE VENDOR10-EVS-TYPE-OCTETS 19.21.1234.2 octets
# ATTRIBUTE VENDOR10-EVS-TYPE-DATE 19.21.1234.3 date
# ATTRIBUTE VENDOR10-EVS-TYPE-ABINARY 19.21.1234.4 abinary
# ATTRIBUTE VENDOR10-EVS-TYPE-BYTE 19.21.1234.5 byte
# ATTRIBUTE VENDOR10-EVS-TYPE-SHORT 19.21.1234.6 short
# ATTRIBUTE VENDOR10-EVS-TYPE-INTEGER 19.21.1234.7 integer
# ATTRIBUTE VENDOR10-EVS-TYPE-SIGNED 19.21.1234.8 signed
# ATTRIBUTE VENDOR10-EVS-TYPE-INTEGER64 19.21.1234.9 integer64
# ATTRIBUTE VENDOR10-EVS-TYPE-IPADDR 19.21.1234.10 ipaddr
# ATTRIBUTE VENDOR10-EVS-TYPE-IPV4PREFIX 19.21.1234.11 ipv4prefix
# ATTRIBUTE VENDOR10-EVS-TYPE-IPV6ADDR 19.21.1234.12 ipv6addr
# ATTRIBUTE VENDOR10-EVS-TYPE-IPV6PREFIX 19.21.1234.13 ipv6prefix
# ATTRIBUTE VENDOR10-EVS-TYPE-COMBOIP 19.21.1234.14 comboip
# ATTRIBUTE VENDOR10-EVS-TYPE-IFID 19.21.1234.15 ifid
# ATTRIBUTE VENDOR10-EVS-TYPE-ETHER 19.21.1234.16 ether
# ATTRIBUTE VENDOR10-EVS-TYPE-TLV 19.21.1234.18 tlv
#
# ATTRIBUTE VENDOR10-LONG-EVS-TYPE-STRING 20.21.1234.1 string
# ATTRIBUTE VENDOR10-LONG-EVS-TYPE-OCTETS 20.21.1234.2 octets
# ATTRIBUTE VENDOR10-LONG-EVS-TYPE-DATE 20.21.1234.3 date
# ATTRIBUTE VENDOR10-LONG-EVS-TYPE-ABINARY 20.21.1234.4 abinary
# ATTRIBUTE VENDOR10-LONG-EVS-TYPE-BYTE 20.21.1234.5 byte
# ATTRIBUTE VENDOR10-LONG-EVS-TYPE-SHORT 20.21.1234.6 short
# ATTRIBUTE VENDOR10-LONG-EVS-TYPE-INTEGER 20.21.1234.7 integer
# ATTRIBUTE VENDOR10-LONG-EVS-TYPE-SIGNED 20.21.1234.8 signed
# ATTRIBUTE VENDOR10-LONG-EVS-TYPE-INTEGER64 20.21.1234.9 integer64
# ATTRIBUTE VENDOR10-LONG-EVS-TYPE-IPADDR 20.21.1234.10 ipaddr
# ATTRIBUTE VENDOR10-LONG-EVS-TYPE-IPV4PREFIX 20.21.1234.11 ipv4prefix
# ATTRIBUTE VENDOR10-LONG-EVS-TYPE-IPV6ADDR 20.21.1234.12 ipv6addr
# ATTRIBUTE VENDOR10-LONG-EVS-TYPE-IPV6PREFIX 20.21.1234.13 ipv6prefix
# ATTRIBUTE VENDOR10-LONG-EVS-TYPE-COMBOIP 20.21.1234.14 comboip
# ATTRIBUTE VENDOR10-LONG-EVS-TYPE-IFID 20.21.1234.15 ifid
# ATTRIBUTE VENDOR10-LONG-EVS-TYPE-ETHER 20.21.1234.16 ether
# ATTRIBUTE VENDOR10-LONG-EVS-TYPE-TLV 20.21.1234.18 tlv
VENDOR TEST10 1234 format=1,0 VENDOR TEST10 1234 format=1,0
BEGIN-VENDOR TEST10 BEGIN-VENDOR TEST10
@@ -118,6 +80,47 @@ ATTRIBUTE VENDOR10-TAGGED-ETHER 116 ether has_tag
END-VENDOR TEST10 END-VENDOR TEST10
BEGIN-VENDOR TEST10 format=RFC-SPACE-TYPE-EVS
ATTRIBUTE VENDOR10-EVS-TYPE-STRING 19.21.1234.1 string
ATTRIBUTE VENDOR10-EVS-TYPE-OCTETS 19.21.1234.2 octets
ATTRIBUTE VENDOR10-EVS-TYPE-DATE 19.21.1234.3 date
ATTRIBUTE VENDOR10-EVS-TYPE-ABINARY 19.21.1234.4 abinary
ATTRIBUTE VENDOR10-EVS-TYPE-BYTE 19.21.1234.5 byte
ATTRIBUTE VENDOR10-EVS-TYPE-SHORT 19.21.1234.6 short
ATTRIBUTE VENDOR10-EVS-TYPE-INTEGER 19.21.1234.7 integer
ATTRIBUTE VENDOR10-EVS-TYPE-SIGNED 19.21.1234.8 signed
ATTRIBUTE VENDOR10-EVS-TYPE-INTEGER64 19.21.1234.9 integer64
ATTRIBUTE VENDOR10-EVS-TYPE-IPADDR 19.21.1234.10 ipaddr
ATTRIBUTE VENDOR10-EVS-TYPE-IPV4PREFIX 19.21.1234.11 ipv4prefix
ATTRIBUTE VENDOR10-EVS-TYPE-IPV6ADDR 19.21.1234.12 ipv6addr
ATTRIBUTE VENDOR10-EVS-TYPE-IPV6PREFIX 19.21.1234.13 ipv6prefix
ATTRIBUTE VENDOR10-EVS-TYPE-COMBOIP 19.21.1234.14 comboip
ATTRIBUTE VENDOR10-EVS-TYPE-IFID 19.21.1234.15 ifid
ATTRIBUTE VENDOR10-EVS-TYPE-ETHER 19.21.1234.16 ether
ATTRIBUTE VENDOR10-EVS-TYPE-TLV 19.21.1234.18 tlv
END-VENDOR
BEGIN-VENDOR TEST10 format=RFC-SPACE-TYPE-EVS
ATTRIBUTE VENDOR10-LONG-EVS-TYPE-STRING 20.21.1234.1 string
ATTRIBUTE VENDOR10-LONG-EVS-TYPE-OCTETS 20.21.1234.2 octets
ATTRIBUTE VENDOR10-LONG-EVS-TYPE-DATE 20.21.1234.3 date
ATTRIBUTE VENDOR10-LONG-EVS-TYPE-ABINARY 20.21.1234.4 abinary
ATTRIBUTE VENDOR10-LONG-EVS-TYPE-BYTE 20.21.1234.5 byte
ATTRIBUTE VENDOR10-LONG-EVS-TYPE-SHORT 20.21.1234.6 short
ATTRIBUTE VENDOR10-LONG-EVS-TYPE-INTEGER 20.21.1234.7 integer
ATTRIBUTE VENDOR10-LONG-EVS-TYPE-SIGNED 20.21.1234.8 signed
ATTRIBUTE VENDOR10-LONG-EVS-TYPE-INTEGER64 20.21.1234.9 integer64
ATTRIBUTE VENDOR10-LONG-EVS-TYPE-IPADDR 20.21.1234.10 ipaddr
ATTRIBUTE VENDOR10-LONG-EVS-TYPE-IPV4PREFIX 20.21.1234.11 ipv4prefix
ATTRIBUTE VENDOR10-LONG-EVS-TYPE-IPV6ADDR 20.21.1234.12 ipv6addr
ATTRIBUTE VENDOR10-LONG-EVS-TYPE-IPV6PREFIX 20.21.1234.13 ipv6prefix
ATTRIBUTE VENDOR10-LONG-EVS-TYPE-COMBOIP 20.21.1234.14 comboip
ATTRIBUTE VENDOR10-LONG-EVS-TYPE-IFID 20.21.1234.15 ifid
ATTRIBUTE VENDOR10-LONG-EVS-TYPE-ETHER 20.21.1234.16 ether
ATTRIBUTE VENDOR10-LONG-EVS-TYPE-TLV 20.21.1234.18 tlv
END-VENDOR
VENDOR TEST11 1235 format=1,1 VENDOR TEST11 1235 format=1,1
BEGIN-VENDOR TEST11 BEGIN-VENDOR TEST11

View File

@@ -119,7 +119,8 @@ def test_valid_attribute_numbers(number):
@pytest.mark.parametrize( @pytest.mark.parametrize(
"invalid_number", ["1000", "ABCD", "-1", "inf", "INF", "-INF", "2e4", "2.5e3"], "invalid_number",
["1000", "ABCD", "-1", "inf", "INF", "-INF", "2e4", "2.5e3"],
) )
def test_invalid_attribute_numbers(invalid_number): def test_invalid_attribute_numbers(invalid_number):
dictionary = StringIO(f"ATTRIBUTE NAME {invalid_number} integer64") dictionary = StringIO(f"ATTRIBUTE NAME {invalid_number} integer64")
@@ -247,7 +248,6 @@ def test_value_number_out_of_limit(value_num, attr_type):
"tlv", "tlv",
"extended", "extended",
"long-extended", "long-extended",
"evs",
], ],
) )
def test_all_datatypes_rfc_space(datatype): def test_all_datatypes_rfc_space(datatype):
@@ -255,6 +255,11 @@ def test_all_datatypes_rfc_space(datatype):
Dictionary("", dictionary) Dictionary("", dictionary)
def test_evs_datatype():
dictionary = StringIO("ATTRIBUTE TEST-ATTRIBUTE 1.10 evs\n")
Dictionary("", dictionary)
@pytest.mark.parametrize( @pytest.mark.parametrize(
"datatype", "datatype",
[ [
@@ -299,7 +304,8 @@ def test_invalid_datatypes_in_vendor_space(datatype):
@pytest.mark.parametrize( @pytest.mark.parametrize(
"invalid_number", ["ABCD", "-1", "inf", "INF", "-INF", "0.1", "2e4", "2.5e3"], "invalid_number",
["ABCD", "-1", "inf", "INF", "-INF", "0.1", "2e4", "2.5e3"],
) )
def test_invalid_value_numbers(invalid_number): def test_invalid_value_numbers(invalid_number):
dictionary = StringIO( dictionary = StringIO(
@@ -401,3 +407,16 @@ def test_get_nonexisting_attributes_from_dictionaries(attribute):
dd = Dictionary("", dictionary) dd = Dictionary("", dictionary)
with pytest.raises(KeyError): with pytest.raises(KeyError):
_ = dd[attribute] _ = dd[attribute]
def test_extended_evs():
dictionary = StringIO(
"ATTRIBUTE RFC-EXTENDED 10 extended\n"
"ATTRIBUTE RFC-EXTENDED-EVS 10.20 evs\n"
"VENDOR TEST-VENDOR 1234\n"
"BEGIN-VENDOR TEST-VENDOR format=RFC-EXTENDED-EVS\n"
"ATTRIBUTE VENDOR-ATTRIBUTE 10 integer\n"
"END-VENDOR TEST-VENDOR"
)
dd = Dictionary("", dictionary)
assert dd["VENDOR-ATTRIBUTE"].code == [10, 20, 10]