#!/usr/bin/env -S python3 -u
# -*- coding: utf-8 -*-

""" asnbl-helper [.py]

Squid helper script for enumerating the ASN (Autonomous System
Number) of an IP address and querying it against a file- or
DNS-based black- or whitelist. If a domain is given, it will be
resolved to its IP addresses, which will then be checked against
the specified black-/whitelist source.

Settings are read from the configuration file path supplied as a
command line argument. """

# Import needed modules...
import configparser
import ipaddress
import logging
import logging.handlers
import os.path
import re
import socket
import sys
import concurrent.futures
from datetime import datetime
import location # See https://location.ipfire.org/download for download and installation instructions


if os.getuid() == 0:
    print("For security purposes, this script must not be executed as root!")
    sys.exit(127)

try:
    CFILE = sys.argv[1]
except IndexError:
    print("Usage: " + sys.argv[0] + " [path to configuration file]")
    sys.exit(127)

# Initialise logging (to "/dev/log" - or STDERR if unavailable - for level INFO by default)
LOGIT = logging.getLogger('squid-asnbl-helper')
LOGIT.setLevel(logging.INFO)

if os.path.islink("/dev/log"):
    HANDLER = logging.handlers.SysLogHandler(address="/dev/log")
else:
    HANDLER = logging.StreamHandler(stream=sys.stderr)
    # There is no additional metadata available when logging to STDERR,
    # so a logging formatter needs to be added here...
    FORMAT = logging.Formatter(fmt="%(asctime)s %(name)s[%(process)d] %(levelname).4s: %(message)s",
                               datefmt="%b %d %H:%M:%S")
    HANDLER.setFormatter(FORMAT)

LOGIT.addHandler(HANDLER)


def is_ipaddress(chkinput: str):
    """ Function call: is_ipaddress(input)
    Tests if input is an IP address. It returns True if it
    is one (v4/v6 does not matter), and False if not."""

    try:
        ipaddress.ip_address(chkinput)
        return True
    except ValueError:
        return False


def is_valid_domain(chkdomain: str):
    """ Function call: is_valid_domain(domain name)
    Checks if given domain is valid, i.e. does not contain any
    unspecified characters. It returns True if a domain was valid,
    and False if not."""

    # Test if chkdomain is an IP address (should not happen here)
    if is_ipaddress(chkdomain):
        return False

    # Allowed characters
    allowedchars = re.compile(r"(?!-)[a-z\d\-\_]{1,63}(?<!-)$", re.IGNORECASE)

    if len(chkdomain) > 255 or "." not in chkdomain:
        # Do not allow domains which are very long or do not contain a dot
        return False

    if chkdomain[-1] == ".":
        # Strip trailing "." if present
        chkdomain = chkdomain[:-1]

    # Check if sublabels are invalid (i.e. are empty, too long or contain
    # invalid characters)
    for sublabel in chkdomain.split("."):
        if not sublabel or not allowedchars.match(sublabel):
            # Sublabel is invalid
            return False

    return True


def build_reverse_ip(ipaddr):
    """ Function call: build_reverse_ip(IP address)

    This function takes an IPv4 or IPv6 address, and converts it so
    a RBL query can performed with. The full DNS query string is then
    returned back."""

    addr = ipaddress.ip_address(ipaddr)

    if addr.version == 6 or addr.version == 4:
        # In this case, we are dealing with an IP address
        rev = '.'.join(addr.reverse_pointer.split('.')[:-2])
        return rev

    # In this case, we are dealing with a martian
    return None


def resolve_addresses(domain: str):
    """ Function call: resolve_address(domain)

    This function takes a domain and enumerates all IPv4 and IPv6
    records for it. They are returned as an array."""

    # Check if this is a valid domain...
    if not is_valid_domain(domain):
        return None

    # List of enumerated IPs, default empty...
    ips = []

    # Resolve A and AAAA records of that domain in parallel...
    with concurrent.futures.ThreadPoolExecutor() as executor:
        tasks = []

        for qtype in [socket.AF_INET, socket.AF_INET6]:
            tasks.append(executor.submit(socket.getaddrinfo, domain, 0, family=qtype, type=socket.SOCK_STREAM))

        for singlequery in concurrent.futures.as_completed(tasks):
            # ... and write the results into the IP address list
            try:
                for singleresult in singlequery.result():
                    ips.append(singleresult[4][0])
            except (socket.gaierror, ValueError):
                # Catch possible DNS exceptions; they do not really matter here...
                pass

    # Deduplicate...
    ips = set(ips)

    return ips


def load_asnbl_file(filepath: str):
    """ Function call: load_asnbl_file(/Path/to/single/ASNBL/file)

    This reads given filename, strips out comments beginning with # or ; ,
    and returns a list of parsed ASNs."""

    with open(filepath, "r") as fhandle:
        fbuffer = fhandle.read().splitlines()

    # Temporary variable to hold list of parsed ASNs from file
    parsedasns = []

    # Convert list entries (usually strings like "ASxxx") into integers
    for singleline in fbuffer:
        # Ignore comments begnning with # or ; (BIND syntax)...
        if not (singleline.startswith("#") or singleline.startswith(";")):
            parsed = int(singleline.strip("AS").split()[0])

            parsedasns.append(parsed)

    return parsedasns


def resolve_asn(ipaddr: str):
    """ Function call: resolve_asn(IP address to be resolved)
    This function looks up the Autonomous System for the given IP address. """

    # Fix for https://bugzilla.ipfire.org/show_bug.cgi?id=13023:
    # Initialize the result variable before it's first use, otherwise python3
    # will sometimes detect a 'mismatch' using global and local variables
    lookup_result = None

    # libloc cannot handle ipaddress objects here, so casting into a string is necessary
    # for good measure, to avoid exceptions here...
    try:
        lookup_result = ASNDB.lookup(str(ipaddr))
    except BlockingIOError:
        # XXX: Prevent likely libloc bug from causing this helper to crash
        # (see upstream bug https://bugzilla.ipfire.org/show_bug.cgi?id=13023)
        pass
    except PermissionError as error:
        # XXX: In certain circumstances not fully clarified yet, above ASNDB.lookup()
        # call will result in a PermissionError, but only on the first instance. All
        # further calls are processed normally, which is why we catch PermissionError
        # here for good measure, to ensure that this script is as functional to Squid
        # as possible. Nevertheless, it is an ugly mitigation, and further information
        # on this edge case is appreciated.
        LOGIT.warning("Encountered PermissionError from ASNDB '%s' while resolving ASN for '%s'. Possibly edge case, please report this: %s",
                      ipaddr, ASNDB, str(error))

    # In case nothing was returned above, satisfy result expectation to this function...
    try:
        if not lookup_result.asn:
            return 0
    except AttributeError:
        return 0

    return lookup_result.asn


def asndb_response_tests(testdata: str):
    """ Function call: asndb_response_tests(response rest data)

    This function asserts the given ASN database to return expected ASNs for
    given IP addresses in order to be considered operational. It returns
    True if this test succeeds, and False otherwise. """

    # Fix for https://bugzilla.ipfire.org/show_bug.cgi?id=13023:
    # Initialize the result variable before it's first use, otherwise python3
    # will sometimes detect a 'mismatch' using global and local variables
    lookup_result_test = None

    tresult = True

    # XXX: Attempt to work around crappy data types from ConfigParser()
    # while trying to keep configuration values as human-readable as possible.
    ctdata = re.sub(r"[\(\),]", "", testdata)
    titerable = iter(ctdata.split())
    ptdata = list(zip(titerable, titerable))

    for stestdata in ptdata:
        LOGIT.debug("Running response test for '%s' against ASNDB '%s' ...",
                    stestdata, ASNDB)

        lookup_result_test = resolve_asn(stestdata[0])

        if lookup_result_test != int(stestdata[1]):
            LOGIT.error("Response test failed for ASNDB '%s' (tuple: %s), aborting",
                        ASNDB, stestdata)
            tresult = False
            break

    return tresult


def check_asn_against_list(asn: int, querystring: str, asnbldomains: list, asnlist: list):
    """ Function call: check_asn_against_list(ASN to be checked,
                                              queried destination,
                                              list of active DNS-based ASNBLs,
                                              list of ASNs read from file-based ASNBLs)
    This takes an enumerated ASN - integer only, without the "AS"
    prefix commonly used -, and performs a lookup against DNS-based ASNBLs/ASNWL,
    a static list read from file-based ASNBLs, or both.

    This function returns True if an ASN matches, an False if not. Passing
    queried destination is necessary for logging root cause of listing hits. """

    # TODO: Replymap support still needs to be implemented for check_asn_against_list().

    fqfailed = True

    if asnbldomains:
        for asnbldom in asnbldomains:
            try:
                answer = socket.getaddrinfo((str(asn) + "." + asnbldom), 0, family=socket.AF_INET, type=socket.SOCK_STREAM)
            except socket.gaierror as error:
                if error.errno == -2:
                    # Catch NXDOMAIN...
                    pass
                else:
                    # And log in case of every other socket error...
                    LOGIT.warning("ASNBL '%s' failed to answer query for '%s' in time (socket error code: %s), returning 'BH'",
                                  asnbldom, asn, error.errno)
                    raise error
            else:
                fqfailed = False

                # Concatenate responses and log them...
                responses = ""
                for rdata in answer:
                    responses = responses + str(rdata) + " "

                LOGIT.warning("ASNBL hit on '%s.%s' with response '%s'",
                              asn, asnbldom, responses.strip())
                break

    if asnlist:
        if asn in asnlist:
            fqfailed = False

            LOGIT.warning("ASNBL hit on '%s', found in given ASN list (queried destination: '%s')",
                          asn, querystring)

    # If any of the queries made above was successful, return True
    if fqfailed:
        return False

    return True


def set_up_location_database(dbpath: str):
    """ Function call: set_up_location_database(path to ASN database file)

    This function does whatever magic is necessary to set up a database object
    for resolving IP addresses into Autonomous Systems. At the moment, IPFire
    location (https://location.ipfire.org/) is supported only, and we rely on
    another script to fetch and update it.
    """

    # Initialise an libloc database object...
    LOGIT.debug("Setting up location database from %s ...", dbpath)
    dbobject = location.Database(dbpath)
    timestamp = datetime.utcfromtimestamp(dbobject.created_at).strftime("%c")
    vendor = dbobject.vendor
    LOGIT.debug("Successfully loaded location database from %s generated '%s' (UTC/GMT) by '%s' - good",
                dbpath, timestamp, vendor)

    # Return the database object generated...
    return dbobject


if os.path.isfile(CFILE) and not os.path.islink(CFILE):
    LOGIT.debug("Attempting to read configuration from '%s' ...", CFILE)

    if os.access(CFILE, os.W_OK) or os.access(CFILE, os.X_OK):
        LOGIT.error("Supplied configuration file '%s' is writeable or executable, aborting", CFILE)
        print("BH")
        sys.exit(127)

    config = configparser.ConfigParser()

    with open(CFILE, "r") as fptr:
        config.read_file(fptr)

    LOGIT.debug("Read configuration from '%s', performing sanity tests...", CFILE)

    # Attempt to read mandatory configuration parameters and see if they contain
    # useful values, if possible to determine.
    try:
        if config["GENERAL"]["LOGLEVEL"].upper() not in ["DEBUG", "INFO", "WARNING", "ERROR"]:
            raise ValueError("log level configuration invalid")

        if not os.path.isfile(config["GENERAL"]["ASNDB_PATH"]) or os.path.islink(CFILE):
            raise ValueError("configured ASN database path is not a file")

        if os.access(config["GENERAL"]["ASNDB_PATH"], os.W_OK) or os.access(config["GENERAL"]["ASNDB_PATH"], os.X_OK):
            raise ValueError("configured ASN database path is writeable or executable")

        if config.getint("GENERAL", "AS_DIVERSITY_THRESHOLD") not in range(2, 10):
            raise ValueError("ASN diversity threshold configured out of bounds")

        for singleckey in ["BLOCK_DIVERSITY_EXCEEDING_DESTINATIONS",
                           "BLOCK_SUSPECTED_SELECTIVE_ANNOUNCEMENTS",
                           "USE_REPLYMAP"]:
            if config.getboolean("GENERAL", singleckey) not in [True, False]:
                raise ValueError("[\"GENERAL\"][\"" + singleckey + "\"] configuration invalid")

        # In case replymap support is enabled, check if corresponding strings exist
        # for enabled anomaly detection modes...
        if config.getboolean("GENERAL", "USE_REPLYMAP"):

            if config.getboolean("GENERAL", "BLOCK_DIVERSITY_EXCEEDING_DESTINATIONS"):
                if not config["GENERAL"]["REPLYSTRING_DIVERSITY_EXCEEDING_DESTINATIONS"]:
                    raise ValueError("replymap string for Fast Flux anomaly detection is missing")

            if config.getboolean("GENERAL", "BLOCK_SUSPECTED_SELECTIVE_ANNOUNCEMENTS"):
                if not config["GENERAL"]["REPLYMAP_SUSPECTED_SELECTIVE_ANNOUNCEMENTS"]:
                    raise ValueError("replymap string for selective announced anomaly detection is missing")

        if not config["GENERAL"]["TESTDATA"]:
            raise ValueError("no ASNDB testing data configured")

        if config["GENERAL"]["ACTIVE_ASNBLS"]:

            for scasnbl in config["GENERAL"]["ACTIVE_ASNBLS"].split():
                if not config[scasnbl]:
                    raise ValueError("configuration section for active ASNBL " + scasnbl + " missing")

                if config[scasnbl]["TYPE"].lower() == "dns":
                    if not is_valid_domain(config[scasnbl]["FQDN"]):
                        raise ValueError("no valid FQDN given for active ASNBL " + scasnbl)
                elif config[scasnbl]["TYPE"].lower() == "file":
                    if not os.path.isfile(config[scasnbl]["PATH"]) or os.path.islink(CFILE):
                        raise ValueError("configured ASNBL file for active ASNBL " + scasnbl +
                                         " is not a file")

                    if os.access(config[scasnbl]["PATH"], os.W_OK) or os.access(config[scasnbl]["PATH"], os.X_OK):
                        raise ValueError("configured ASNBL file for active ASNBL " + scasnbl +
                                         " is writeable or executable")
                else:
                    raise ValueError("invalid type for active ASNBL " + scasnbl)

        else:
            # It is possible to run this helper script without any ASN database, detecting
            # Fast Flux and/or selectively announced network setups only. In this case, at
            # least one of these must be enabled, otherwise this would NOOP silently...

            if not (config.getboolean("GENERAL", "BLOCK_DIVERSITY_EXCEEDING_DESTINATIONS") or config.getboolean("GENERAL", "BLOCK_SUSPECTED_SELECTIVE_ANNOUNCEMENTS")):
                raise ValueError("Neither ASNBLs nor Fast Flux or selectively announced network " +
                                 "detection enabled - this would result in running as a NOOP, please adjust " +
                                 "your configuration file or remove this helper from your Squid configuration")

            LOGIT.warning("No ASNBL configured. This is acceptable as long as this script is configured to do anything, you just have been warned...")

    except (KeyError, ValueError) as error:
        LOGIT.error("Configuration sanity tests failed: %s", error)
        print("BH")
        sys.exit(127)

    LOGIT.debug("Configuation sanity tests passed, good, processing...")

    # Apply configured logging level to avoid INFO/DEBUG clutter (thanks, cf5cec3a)...
    LOGIT.setLevel({"DEBUG": logging.DEBUG,
                    "INFO": logging.INFO,
                    "WARNING": logging.WARNING,
                    "ERROR": logging.ERROR}[config["GENERAL"]["LOGLEVEL"].upper()])

else:
    LOGIT.error("Supplied configuraion file path '%s' is not a file", CFILE)
    print("BH")
    sys.exit(127)

# Placeholders for ASNBL sources (files, FQDNs) and read contents...
ASNBLDOMAINS = []
ASNBLFILES = []
ASNLIST = []

for scasnbl in config["GENERAL"]["ACTIVE_ASNBLS"].split():
    if config[scasnbl]["TYPE"] == "file":
        ASNBLFILES.append(config[scasnbl]["PATH"])
    elif config[scasnbl]["TYPE"] == "dns":
        ASNBLDOMAINS.append(config[scasnbl]["FQDN"].strip(".") + ".")
    else:
        # This should not happen as invalid ASNBL types were caught before,
        # but we will never know...
        LOGIT.error("Detected invalid type '%s' while processing active ASNBL '%s'. This should not happen, bailing!",
                    config[scasnbl]["TYPE"], scasnbl)
        print("BH")
        sys.exit(127)

# Set up ASN database...
ASNDB = set_up_location_database(config["GENERAL"]["ASNDB_PATH"])

LOGIT.debug("Running ASN database response tests...")
if asndb_response_tests(config["GENERAL"]["TESTDATA"]):
    LOGIT.debug("ASN database operational - excellent")
else:
    LOGIT.error("ASN database response tests failed, aborting")
    print("BH")
    sys.exit(127)

# Read contents from given ASNBL files...
if ASNBLFILES:
    for singlefile in ASNBLFILES:
        ASNLIST.extend(load_asnbl_file(singlefile))
        LOGIT.debug("Successfully read ASN list from %s, %s entries by now", singlefile, len(ASNLIST))

    LOGIT.debug("Successfully read all configured ASN lists, %s entries in total", len(ASNLIST))

# Read domains or IP addresses from STDIN in a while loop, resolve IP
# addresses if necessary, and do ASN lookups against specified socket for
# each IP address. Query all specified ASN black-/whitelists afterwards,
# return OK if a ASN hits, or ERR if none was found.
LOGIT.debug("Initialization phase completed successfully, entering input loop...")
while True:
    try:
        QUERYSTRING = str(sys.stdin.readline().rstrip().split()[0])
    except (IndexError, KeyboardInterrupt):
        sys.exit(127)

    # Abort if no STDIN input was received
    if not QUERYSTRING:
        break

    # Check if input is an IP address or a valid domain, and return "BH"
    # if none matches. In case of domains, resolve corresponding IP addresses
    if is_ipaddress(QUERYSTRING):
        IPS = [QUERYSTRING]
    elif is_valid_domain(QUERYSTRING):
        IPS = resolve_addresses(QUERYSTRING.strip(".") + ".")

        # Test if any IP address was successfully resolved for given destination...
        if not IPS:
            # ... if not, we'll return ERR instead of BH, since the latter one causes Squid
            # to display "permission denied" messages to the client, which is confusing.
            # Further, it may allow authenticated users to run a DoS against Squid, since
            # too many BH's per time are considered as a problem, eventually causing Squid
            # to terminate itself.
            #
            # ERR is considered to be safe here, as Squid won't be able to establish a
            # connection anyway, no matter whether the destination is blacklisted or not,
            # provided both Squid and this script use the same DNS resolver.
            LOGIT.info("Unable to resolve A/AAAA record of queried destination '%s', returning ERR...",
                       QUERYSTRING)
            print("ERR")
            continue
    else:
        # Same as above: Returning BH here opens up a DoS vector, although it would be
        # more appropriate...
        LOGIT.info("queried destination '%s' is neither a valid FQDN nor an IP address, returning 'ERR'",
                   QUERYSTRING)
        print("ERR")
        continue

    # Enumerate ASN for each IP address in $IPS...
    ASNS = []
    for singleip in IPS:
        # Enumerate ASN for this IP address...
        resolvedasn = resolve_asn(singleip)

        # In case protection against destinations without public AS announcements for their
        # IP addresses is desired, the query will be denied in case ASN = 0 appears in an
        # resolve_asn() return value...
        #
        # This check will ignore IPv6 addresses within 2002::/16, as specified for 6to4 in
        # RFC 3068. There is no globally valid announcement for this space, hence libloc
        # won't return any ASN on purpose.
        if resolvedasn == 0:
            if ipaddress.ip_address(singleip) in ipaddress.ip_network("2002::/16"):
                # In case of 6to4 IPv6 addresses, insert an arbitrary ASN to prevent sites
                # hosted only within this CIDR becoming unreachable.
                resolvedasn = 64496
            else:
                LOGIT.warning("Destination '%s' resolves to IP addresses '%s' without corresponding ASN, probably selectively announced",
                              QUERYSTRING, singleip)

        # Do not append failed lookup results (ASN < 0 or empty or not an integer) or
        # duplicate entries as they do not contribute to Fast Flux detection. In order to
        # detect selective announcements, 0 will be preserved, which is considered to be
        # safe in terms of Fast Flux handling as well...
        if isinstance(type(resolvedasn), type(int)) and resolvedasn > -1 and resolvedasn not in ASNS:
            ASNS.append(resolvedasn)

    # Return BH if no ASNs were enumerated by the for loop above...
    if not ASNS:
        print("BH")
        continue

    if config.getboolean("GENERAL", "USE_REPLYMAP"):
        replystring = "message=\""

    # Deny access to destinations without public AS announcements for their IP addresses...
    if 0 in ASNS and config.getboolean("GENERAL", "BLOCK_SUSPECTED_SELECTIVE_ANNOUNCEMENTS"):
        LOGIT.info("Denying access to destination '%s' due to suspected selective announcements",
                   QUERYSTRING)

        if config.getboolean("GENERAL", "USE_REPLYMAP"):
            replystring = replystring + config["GENERAL"]["REPLYMAP_SUSPECTED_SELECTIVE_ANNOUNCEMENTS"]
            print("OK", replystring.strip() + "\"")
        else:
            print("OK")

        continue

    # Primitive Fast Flux mitigation: If a destination resolves to
    # different IP addresses within too many different ASNs (normally 1-4),
    # it may be considered as C&C/Fast Flux domain.
    #
    # Depending on the configuration set at the beginning of this
    # script, this is ignored or access will be denied.
    if len(ASNS) > config.getint("GENERAL", "AS_DIVERSITY_THRESHOLD"):

        # Sort ASN list for convenience...
        ASNS.sort()

        LOGIT.warning("Destination '%s' exceeds ASN diversity threshold (%s > %s), possibly Fast Flux: %s",
                      QUERYSTRING, len(ASNS), config["GENERAL"]["AS_DIVERSITY_THRESHOLD"], ASNS)

        if config.getboolean("GENERAL", "BLOCK_DIVERSITY_EXCEEDING_DESTINATIONS"):
            LOGIT.info("Denying access to possible Fast Flux destination '%s'",
                       QUERYSTRING)

            if config.getboolean("GENERAL", "USE_REPLYMAP"):
                replystring = replystring + config["GENERAL"]["REPLYSTRING_DIVERSITY_EXCEEDING_DESTINATIONS"]
                print("OK", replystring.strip() + "\"")
            else:
                print("OK")

            continue

    # Query enumerated ASNs against specified black-/whitelist sources, if enabled...
    if config["GENERAL"]["ACTIVE_ASNBLS"]:
        qfailed = True
        for singleasn in ASNS:
            try:
                if check_asn_against_list(singleasn, QUERYSTRING, ASNBLDOMAINS, ASNLIST):
                    qfailed = False
                    print("OK")
                    break
            except socket.gaierror:
                # Return "BH" in case of DNS failures not indicating a non-listed AS...
                qfailed = False
                print("BH")
                break

        if qfailed:
            print("ERR")

    else:
        print("ERR")

# EOF
