#!/usr/bin/env python3
# Unix SMB/CIFS implementation.
# Copyright (C) Stefan Metzmacher 2020
#
# This program is free software; you can redistribute it and/or modify
# it under the terms of the GNU General Public License as published by
# the Free Software Foundation; either version 3 of the License, or
# (at your option) any later version.
#
# This program is distributed in the hope that it will be useful,
# but WITHOUT ANY WARRANTY; without even the implied warranty of
# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the
# GNU General Public License for more details.
#
# You should have received a copy of the GNU General Public License
# along with this program.  If not, see <http://www.gnu.org/licenses/>.
#

import sys
import os

sys.path.insert(0, "bin/python")
os.environ["PYTHONUNBUFFERED"] = "1"

import functools
import time

from samba import dsdb, ntstatus
from samba.dcerpc import krb5pac, lsa, security

from samba.tests import env_get_var_value
from samba.tests.krb5.kcrypto import Cksumtype, Enctype
from samba.tests.krb5.kdc_base_test import KDCBaseTest
from samba.tests.krb5.raw_testcase import (
    RawKerberosTest,
    RodcPacEncryptionKey,
    ZeroedChecksumKey
)
from samba.tests.krb5.rfc4120_constants import (
    AES256_CTS_HMAC_SHA1_96,
    AD_IF_RELEVANT,
    ARCFOUR_HMAC_MD5,
    KDC_ERR_BADMATCH,
    KDC_ERR_BADOPTION,
    KDC_ERR_BAD_INTEGRITY,
    KDC_ERR_GENERIC,
    KDC_ERR_INAPP_CKSUM,
    KDC_ERR_MODIFIED,
    KDC_ERR_SUMTYPE_NOSUPP,
    KDC_ERR_TGT_REVOKED,
    KU_PA_ENC_TIMESTAMP,
    KU_AS_REP_ENC_PART,
    KU_TGS_REP_ENC_PART_SUB_KEY,
    KU_TGS_REQ_AUTH_DAT_SESSION,
    KU_TGS_REQ_AUTH_DAT_SUBKEY,
    NT_PRINCIPAL,
)
import samba.tests.krb5.rfc4120_pyasn1 as krb5_asn1

SidType = RawKerberosTest.SidType

global_asn1_print = False
global_hexdump = False


class S4UKerberosTests(KDCBaseTest):

    default_attrs = security.SE_GROUP_DEFAULT_FLAGS

    def setUp(self):
        super(S4UKerberosTests, self).setUp()
        self.do_asn1_print = global_asn1_print
        self.do_hexdump = global_hexdump

    def _test_s4u2self(self, pa_s4u2self_ctype=None):
        service_creds = self.get_service_creds()
        service = service_creds.get_username()
        realm = service_creds.get_realm()

        cname = self.PrincipalName_create(name_type=1, names=[service])
        sname = self.PrincipalName_create(name_type=2, names=["krbtgt", realm])

        till = self.get_KerberosTime(offset=36000)

        kdc_options = krb5_asn1.KDCOptions('forwardable')
        padata = None

        etypes = (18, 17, 23)

        req = self.AS_REQ_create(padata=padata,
                                 kdc_options=str(kdc_options),
                                 cname=cname,
                                 realm=realm,
                                 sname=sname,
                                 from_time=None,
                                 till_time=till,
                                 renew_time=None,
                                 nonce=0x7fffffff,
                                 etypes=etypes,
                                 addresses=None,
                                 additional_tickets=None)
        rep = self.send_recv_transaction(req)
        self.assertIsNotNone(rep)

        self.assertEqual(rep['msg-type'], 30)
        self.assertEqual(rep['error-code'], 25)
        rep_padata = self.der_decode(
            rep['e-data'], asn1Spec=krb5_asn1.METHOD_DATA())

        for pa in rep_padata:
            if pa['padata-type'] == 19:
                etype_info2 = pa['padata-value']
                break

        etype_info2 = self.der_decode(
            etype_info2, asn1Spec=krb5_asn1.ETYPE_INFO2())

        key = self.PasswordKey_from_etype_info2(service_creds, etype_info2[0])

        (patime, pausec) = self.get_KerberosTimeWithUsec()
        pa_ts = self.PA_ENC_TS_ENC_create(patime, pausec)
        pa_ts = self.der_encode(pa_ts, asn1Spec=krb5_asn1.PA_ENC_TS_ENC())

        pa_ts = self.EncryptedData_create(key, KU_PA_ENC_TIMESTAMP, pa_ts)
        pa_ts = self.der_encode(pa_ts, asn1Spec=krb5_asn1.EncryptedData())

        pa_ts = self.PA_DATA_create(2, pa_ts)

        kdc_options = krb5_asn1.KDCOptions('forwardable')
        padata = [pa_ts]

        req = self.AS_REQ_create(padata=padata,
                                 kdc_options=str(kdc_options),
                                 cname=cname,
                                 realm=realm,
                                 sname=sname,
                                 from_time=None,
                                 till_time=till,
                                 renew_time=None,
                                 nonce=0x7fffffff,
                                 etypes=etypes,
                                 addresses=None,
                                 additional_tickets=None)
        rep = self.send_recv_transaction(req)
        self.assertIsNotNone(rep)

        msg_type = rep['msg-type']
        self.assertEqual(msg_type, 11)

        enc_part2 = key.decrypt(KU_AS_REP_ENC_PART, rep['enc-part']['cipher'])
        # MIT KDC encodes both EncASRepPart and EncTGSRepPart with
        # application tag 26
        try:
            enc_part2 = self.der_decode(
                enc_part2, asn1Spec=krb5_asn1.EncASRepPart())
        except Exception:
            enc_part2 = self.der_decode(
                enc_part2, asn1Spec=krb5_asn1.EncTGSRepPart())

        # S4U2Self Request
        sname = cname

        for_user_name = env_get_var_value('FOR_USER')
        uname = self.PrincipalName_create(name_type=1, names=[for_user_name])

        kdc_options = krb5_asn1.KDCOptions('forwardable')
        till = self.get_KerberosTime(offset=36000)
        ticket = rep['ticket']
        ticket_session_key = self.EncryptionKey_import(enc_part2['key'])
        pa_s4u = self.PA_S4U2Self_create(name=uname, realm=realm,
                                         tgt_session_key=ticket_session_key,
                                         ctype=pa_s4u2self_ctype)
        padata = [pa_s4u]

        subkey = self.RandomKey(ticket_session_key.etype)

        (ctime, cusec) = self.get_KerberosTimeWithUsec()

        req = self.TGS_REQ_create(padata=padata,
                                  cusec=cusec,
                                  ctime=ctime,
                                  ticket=ticket,
                                  kdc_options=str(kdc_options),
                                  cname=cname,
                                  realm=realm,
                                  sname=sname,
                                  from_time=None,
                                  till_time=till,
                                  renew_time=None,
                                  nonce=0x7ffffffe,
                                  etypes=etypes,
                                  addresses=None,
                                  EncAuthorizationData=None,
                                  EncAuthorizationData_key=None,
                                  additional_tickets=None,
                                  ticket_session_key=ticket_session_key,
                                  authenticator_subkey=subkey)
        rep = self.send_recv_transaction(req)
        self.assertIsNotNone(rep)

        msg_type = rep['msg-type']
        if msg_type == 13:
            enc_part2 = subkey.decrypt(
                KU_TGS_REP_ENC_PART_SUB_KEY, rep['enc-part']['cipher'])
            enc_part2 = self.der_decode(
                enc_part2, asn1Spec=krb5_asn1.EncTGSRepPart())

        return msg_type

    # Using the checksum type from the tgt_session_key happens to work
    # everywhere
    def test_s4u2self(self):
        msg_type = self._test_s4u2self()
        self.assertEqual(msg_type, 13)

    # Per spec, the checksum of PA-FOR-USER is HMAC_MD5, see [MS-SFU] 2.2.1
    def test_s4u2self_hmac_md5_checksum(self):
        msg_type = self._test_s4u2self(pa_s4u2self_ctype=Cksumtype.HMAC_MD5)
        self.assertEqual(msg_type, 13)

    def test_s4u2self_md5_unkeyed_checksum(self):
        msg_type = self._test_s4u2self(pa_s4u2self_ctype=Cksumtype.MD5)
        self.assertEqual(msg_type, 30)

    def test_s4u2self_sha1_unkeyed_checksum(self):
        msg_type = self._test_s4u2self(pa_s4u2self_ctype=Cksumtype.SHA1)
        self.assertEqual(msg_type, 30)

    def test_s4u2self_crc32_unkeyed_checksum(self):
        msg_type = self._test_s4u2self(pa_s4u2self_ctype=Cksumtype.CRC32)
        self.assertEqual(msg_type, 30)

    def _run_s4u2self_test(self, kdc_dict):
        client_opts = kdc_dict.pop('client_opts', None)
        client_creds = self.get_cached_creds(
            account_type=self.AccountType.USER,
            opts=client_opts)

        service_opts = kdc_dict.pop('service_opts', None)
        service_creds = self.get_cached_creds(
            account_type=self.AccountType.COMPUTER,
            opts=service_opts)

        service_tgt = self.get_tgt(service_creds)
        modify_service_tgt_fn = kdc_dict.pop('modify_service_tgt_fn', None)
        if modify_service_tgt_fn is not None:
            service_tgt = modify_service_tgt_fn(service_tgt)

        client_name = client_creds.get_username()
        client_cname = self.PrincipalName_create(name_type=NT_PRINCIPAL,
                                                 names=[client_name])

        service_name = kdc_dict.pop('service_name', None)
        if service_name is None:
            service_name = service_creds.get_username()[:-1]
        service_sname = self.PrincipalName_create(name_type=NT_PRINCIPAL,
                                                  names=['host', service_name])

        realm = client_creds.get_realm()

        expected_flags = kdc_dict.pop('expected_flags', None)
        if expected_flags is not None:
            expected_flags = krb5_asn1.TicketFlags(expected_flags)

        unexpected_flags = kdc_dict.pop('unexpected_flags', None)
        if unexpected_flags is not None:
            unexpected_flags = krb5_asn1.TicketFlags(unexpected_flags)

        expected_error_mode = kdc_dict.pop('expected_error_mode', 0)
        expect_status = kdc_dict.pop('expect_status', None)
        expected_status = kdc_dict.pop('expected_status', None)
        if expected_error_mode:
            check_error_fn = self.generic_check_kdc_error
            check_rep_fn = None
        else:
            check_error_fn = None
            check_rep_fn = self.generic_check_kdc_rep

            self.assertIsNone(expect_status)
            self.assertIsNone(expected_status)

        kdc_options = kdc_dict.pop('kdc_options', '0')
        kdc_options = krb5_asn1.KDCOptions(kdc_options)

        service_decryption_key = self.TicketDecryptionKey_from_creds(
            service_creds)

        authenticator_subkey = self.RandomKey(Enctype.AES256)

        etypes = kdc_dict.pop('etypes', (AES256_CTS_HMAC_SHA1_96,
                                         ARCFOUR_HMAC_MD5))

        expect_edata = kdc_dict.pop('expect_edata', None)
        expected_groups = kdc_dict.pop('expected_groups', None)
        unexpected_groups = kdc_dict.pop('unexpected_groups', None)

        def generate_s4u2self_padata(_kdc_exchange_dict,
                                     _callback_dict,
                                     req_body):
            pa_s4u = self.PA_S4U2Self_create(
                name=client_cname,
                realm=realm,
                tgt_session_key=service_tgt.session_key,
                ctype=None)

            return [pa_s4u], req_body

        kdc_exchange_dict = self.tgs_exchange_dict(
            expected_crealm=realm,
            expected_cname=client_cname,
            expected_srealm=realm,
            expected_sname=service_sname,
            expected_account_name=client_name,
            expected_groups=expected_groups,
            unexpected_groups=unexpected_groups,
            expected_sid=client_creds.get_sid(),
            expected_flags=expected_flags,
            unexpected_flags=unexpected_flags,
            ticket_decryption_key=service_decryption_key,
            expect_ticket_checksum=True,
            generate_padata_fn=generate_s4u2self_padata,
            check_error_fn=check_error_fn,
            check_rep_fn=check_rep_fn,
            check_kdc_private_fn=self.generic_check_kdc_private,
            expected_error_mode=expected_error_mode,
            expect_status=expect_status,
            expected_status=expected_status,
            tgt=service_tgt,
            authenticator_subkey=authenticator_subkey,
            kdc_options=str(kdc_options),
            expect_edata=expect_edata)

        self._generic_kdc_exchange(kdc_exchange_dict,
                                   cname=None,
                                   realm=realm,
                                   sname=service_sname,
                                   etypes=etypes)

        if not expected_error_mode:
            # Check that the ticket contains a PAC.
            ticket = kdc_exchange_dict['rep_ticket_creds']

            pac = self.get_ticket_pac(ticket)
            self.assertIsNotNone(pac)

        # Ensure we used all the parameters given to us.
        self.assertEqual({}, kdc_dict)

    # Test performing an S4U2Self operation with a forwardable ticket. The
    # resulting ticket should have the 'forwardable' flag set.
    def test_s4u2self_forwardable(self):
        self._run_s4u2self_test(
            {
                'client_opts': {
                    'not_delegated': False
                },
                'kdc_options': 'forwardable',
                'modify_service_tgt_fn': functools.partial(
                    self.set_ticket_forwardable, flag=True),
                'expected_flags': 'forwardable'
            })

    # Test performing an S4U2Self operation with a forwardable ticket that does
    # not contain a PAC. The request should fail.
    def test_s4u2self_no_pac(self):
        def forwardable_no_pac(ticket):
            ticket = self.set_ticket_forwardable(ticket, flag=True)
            return self.remove_ticket_pac(ticket)

        self._run_s4u2self_test(
            {
                'expected_error_mode': KDC_ERR_TGT_REVOKED,
                'client_opts': {
                    'not_delegated': False
                },
                'kdc_options': 'forwardable',
                'modify_service_tgt_fn': forwardable_no_pac,
                'expected_flags': 'forwardable',
                'expect_edata': False
            })

    # Test performing an S4U2Self operation without requesting a forwardable
    # ticket. The resulting ticket should not have the 'forwardable' flag set.
    def test_s4u2self_without_forwardable(self):
        self._run_s4u2self_test(
            {
                'client_opts': {
                    'not_delegated': False
                },
                'modify_service_tgt_fn': functools.partial(
                    self.set_ticket_forwardable, flag=True),
                'unexpected_flags': 'forwardable'
            })

    # Do an S4U2Self with a non-forwardable TGT. The 'forwardable' flag should
    # not be set on the ticket.
    def test_s4u2self_not_forwardable(self):
        self._run_s4u2self_test(
            {
                'client_opts': {
                    'not_delegated': False
                },
                'kdc_options': 'forwardable',
                'modify_service_tgt_fn': functools.partial(
                    self.set_ticket_forwardable, flag=False),
                'unexpected_flags': 'forwardable'
            })

    # Do an S4U2Self with the not_delegated flag set on the client. The
    # 'forwardable' flag should not be set on the ticket.
    def test_s4u2self_client_not_delegated(self):
        self._run_s4u2self_test(
            {
                'client_opts': {
                    'not_delegated': True
                },
                'kdc_options': 'forwardable',
                'modify_service_tgt_fn': functools.partial(
                    self.set_ticket_forwardable, flag=True),
                'unexpected_flags': 'forwardable'
            })

    # Do an S4U2Self with a service not trusted to authenticate for delegation,
    # but having an empty msDS-AllowedToDelegateTo attribute. The 'forwardable'
    # flag should be set on the ticket.
    def test_s4u2self_not_trusted_empty_allowed(self):
        self._run_s4u2self_test(
            {
                'client_opts': {
                    'not_delegated': False
                },
                'service_opts': {
                    'trusted_to_auth_for_delegation': False,
                    'delegation_to_spn': ()
                },
                'kdc_options': 'forwardable',
                'modify_service_tgt_fn': functools.partial(
                    self.set_ticket_forwardable, flag=True),
                'expected_flags': 'forwardable'
            })

    # Do an S4U2Self with a service not trusted to authenticate for delegation
    # and having a non-empty msDS-AllowedToDelegateTo attribute. The
    # 'forwardable' flag should not be set on the ticket.
    def test_s4u2self_not_trusted_nonempty_allowed(self):
        self._run_s4u2self_test(
            {
                'client_opts': {
                    'not_delegated': False
                },
                'service_opts': {
                    'trusted_to_auth_for_delegation': False,
                    'delegation_to_spn': ('test',)
                },
                'kdc_options': 'forwardable',
                'modify_service_tgt_fn': functools.partial(
                    self.set_ticket_forwardable, flag=True),
                'unexpected_flags': 'forwardable'
            })

    # Do an S4U2Self with a service trusted to authenticate for delegation and
    # having an empty msDS-AllowedToDelegateTo attribute. The 'forwardable'
    # flag should be set on the ticket.
    def test_s4u2self_trusted_empty_allowed(self):
        self._run_s4u2self_test(
            {
                'client_opts': {
                    'not_delegated': False
                },
                'service_opts': {
                    'trusted_to_auth_for_delegation': True,
                    'delegation_to_spn': ()
                },
                'kdc_options': 'forwardable',
                'modify_service_tgt_fn': functools.partial(
                    self.set_ticket_forwardable, flag=True),
                'expected_flags': 'forwardable'
            })

    # Do an S4U2Self with a service trusted to authenticate for delegation and
    # having a non-empty msDS-AllowedToDelegateTo attribute. The 'forwardable'
    # flag should be set on the ticket.
    def test_s4u2self_trusted_nonempty_allowed(self):
        self._run_s4u2self_test(
            {
                'client_opts': {
                    'not_delegated': False
                },
                'service_opts': {
                    'trusted_to_auth_for_delegation': True,
                    'delegation_to_spn': ('test',)
                },
                'kdc_options': 'forwardable',
                'modify_service_tgt_fn': functools.partial(
                    self.set_ticket_forwardable, flag=True),
                'expected_flags': 'forwardable'
            })

    # Do an S4U2Self with the sname in the request different to that of the
    # service. We expect an error.
    def test_s4u2self_wrong_sname(self):
        other_creds = self.get_cached_creds(
            account_type=self.AccountType.COMPUTER,
            opts={
                'trusted_to_auth_for_delegation': True,
                'id': 0
            })
        other_sname = other_creds.get_username()[:-1]

        self._run_s4u2self_test(
            {
                'expected_error_mode': KDC_ERR_BADMATCH,
                'expect_edata': False,
                'client_opts': {
                    'not_delegated': False
                },
                'service_opts': {
                    'trusted_to_auth_for_delegation': True
                },
                'service_name': other_sname,
                'kdc_options': 'forwardable',
                'modify_service_tgt_fn': functools.partial(
                    self.set_ticket_forwardable, flag=True)
            })

    # Do an S4U2Self where the service does not require authorization data. The
    # resulting ticket should still contain a PAC.
    def test_s4u2self_no_auth_data_required(self):
        self._run_s4u2self_test(
            {
                'client_opts': {
                    'not_delegated': False
                },
                'service_opts': {
                    'trusted_to_auth_for_delegation': True,
                    'no_auth_data_required': True
                },
                'kdc_options': 'forwardable',
                'modify_service_tgt_fn': functools.partial(
                    self.set_ticket_forwardable, flag=True),
                'expected_flags': 'forwardable'
            })

    # Do an S4U2Self an check that the service asserted identity is part of
    # the sids.
    def test_s4u2self_asserted_identity(self):
        self._run_s4u2self_test(
            {
                'client_opts': {
                    'not_delegated': False
                },
                'expected_groups': {
                    (security.SID_SERVICE_ASSERTED_IDENTITY,
                     SidType.EXTRA_SID,
                     self.default_attrs),
                    ...
                },
                'unexpected_groups': {
                    security.SID_AUTHENTICATION_AUTHORITY_ASSERTED_IDENTITY,
                },
            })

    def _run_delegation_test(self, kdc_dict):
        s4u2self = kdc_dict.pop('s4u2self', False)

        authtime_delay = kdc_dict.pop('authtime_delay', 0)

        client_opts = kdc_dict.pop('client_opts', None)
        client_creds = self.get_cached_creds(
            account_type=self.AccountType.USER,
            opts=client_opts)

        sid = client_creds.get_sid()

        service1_opts = kdc_dict.pop('service1_opts', {})
        service2_opts = kdc_dict.pop('service2_opts', {})

        allow_delegation = kdc_dict.pop('allow_delegation', False)
        allow_rbcd = kdc_dict.pop('allow_rbcd', False)
        self.assertFalse(allow_delegation and allow_rbcd)

        if allow_rbcd:
            service1_creds = self.get_cached_creds(
                account_type=self.AccountType.COMPUTER,
                opts=service1_opts)

            self.assertNotIn('delegation_from_dn', service2_opts)
            service2_opts['delegation_from_dn'] = str(service1_creds.get_dn())

            service2_creds = self.get_cached_creds(
                account_type=self.AccountType.COMPUTER,
                opts=service2_opts)
        else:
            service2_creds = self.get_cached_creds(
                account_type=self.AccountType.COMPUTER,
                opts=service2_opts)

            if allow_delegation:
                self.assertNotIn('delegation_to_spn', service1_opts)
                service1_opts['delegation_to_spn'] = service2_creds.get_spn()

            service1_creds = self.get_cached_creds(
                account_type=self.AccountType.COMPUTER,
                opts=service1_opts)

        service1_tgt = self.get_tgt(service1_creds)
        self.assertElementPresent(service1_tgt.ticket_private, 'authtime')
        service1_tgt_authtime = self.getElementValue(service1_tgt.ticket_private, 'authtime')

        client_username = client_creds.get_username()
        client_realm = client_creds.get_realm()
        client_cname = self.PrincipalName_create(name_type=NT_PRINCIPAL,
                                                 names=[client_username])

        service1_name = service1_creds.get_username()[:-1]
        service1_realm = service1_creds.get_realm()
        service1_service = 'host'
        service1_sname = self.PrincipalName_create(
            name_type=NT_PRINCIPAL, names=[service1_service,
                                           service1_name])
        service1_decryption_key = self.TicketDecryptionKey_from_creds(
            service1_creds)

        expect_pac = kdc_dict.pop('expect_pac', True)

        expected_groups = kdc_dict.pop('expected_groups', None)
        unexpected_groups = kdc_dict.pop('unexpected_groups', None)

        client_tkt_options = kdc_dict.pop('client_tkt_options', 'forwardable')
        expected_flags = krb5_asn1.TicketFlags(client_tkt_options)

        etypes = kdc_dict.pop('etypes', (AES256_CTS_HMAC_SHA1_96,
                                         ARCFOUR_HMAC_MD5))

        if s4u2self:
            self.assertEqual(authtime_delay, 0)

            def generate_s4u2self_padata(_kdc_exchange_dict,
                                         _callback_dict,
                                         req_body):
                pa_s4u = self.PA_S4U2Self_create(
                    name=client_cname,
                    realm=client_realm,
                    tgt_session_key=service1_tgt.session_key,
                    ctype=None)

                return [pa_s4u], req_body

            s4u2self_expected_flags = krb5_asn1.TicketFlags('forwardable')
            s4u2self_unexpected_flags = krb5_asn1.TicketFlags('0')

            s4u2self_kdc_options = krb5_asn1.KDCOptions('forwardable')

            s4u2self_authenticator_subkey = self.RandomKey(Enctype.AES256)
            s4u2self_kdc_exchange_dict = self.tgs_exchange_dict(
                expected_crealm=client_realm,
                expected_cname=client_cname,
                expected_srealm=service1_realm,
                expected_sname=service1_sname,
                expected_account_name=client_username,
                expected_groups=expected_groups,
                unexpected_groups=unexpected_groups,
                expected_sid=sid,
                expected_flags=s4u2self_expected_flags,
                unexpected_flags=s4u2self_unexpected_flags,
                ticket_decryption_key=service1_decryption_key,
                generate_padata_fn=generate_s4u2self_padata,
                check_rep_fn=self.generic_check_kdc_rep,
                check_kdc_private_fn=self.generic_check_kdc_private,
                tgt=service1_tgt,
                authenticator_subkey=s4u2self_authenticator_subkey,
                kdc_options=str(s4u2self_kdc_options),
                expect_edata=False)

            self._generic_kdc_exchange(s4u2self_kdc_exchange_dict,
                                       cname=None,
                                       realm=service1_realm,
                                       sname=service1_sname,
                                       etypes=etypes)

            client_service_tkt = s4u2self_kdc_exchange_dict['rep_ticket_creds']
        else:
            if authtime_delay != 0:
                time.sleep(authtime_delay)
                fresh = True
            else:
                fresh = False

            client_tgt = self.get_tgt(client_creds,
                                      kdc_options=client_tkt_options,
                                      expected_flags=expected_flags,
                                      fresh=fresh)
            client_service_tkt = self.get_service_ticket(
                client_tgt,
                service1_creds,
                kdc_options=client_tkt_options,
                expected_flags=expected_flags,
                fresh=fresh)

        modify_client_tkt_fn = kdc_dict.pop('modify_client_tkt_fn', None)
        if modify_client_tkt_fn is not None:
            client_service_tkt = modify_client_tkt_fn(client_service_tkt)

        self.assertElementPresent(client_service_tkt.ticket_private, 'authtime')
        expected_authtime = self.getElementValue(client_service_tkt.ticket_private, 'authtime')
        if authtime_delay > 1:
            self.assertNotEqual(expected_authtime, service1_tgt_authtime)

        additional_tickets = [client_service_tkt.ticket]

        modify_service_tgt_fn = kdc_dict.pop('modify_service_tgt_fn', None)
        if modify_service_tgt_fn is not None:
            service1_tgt = modify_service_tgt_fn(service1_tgt)

        kdc_options = kdc_dict.pop('kdc_options', None)
        if kdc_options is None:
            kdc_options = str(krb5_asn1.KDCOptions('cname-in-addl-tkt'))

        service2_name = service2_creds.get_username()[:-1]
        service2_realm = service2_creds.get_realm()
        service2_service = 'host'
        service2_sname = self.PrincipalName_create(
            name_type=NT_PRINCIPAL, names=[service2_service,
                                           service2_name])
        service2_decryption_key = self.TicketDecryptionKey_from_creds(
            service2_creds)
        service2_etypes = service2_creds.tgs_supported_enctypes

        expected_error_mode = kdc_dict.pop('expected_error_mode')
        expect_status = kdc_dict.pop('expect_status', None)
        expected_status = kdc_dict.pop('expected_status', None)
        if expected_error_mode:
            check_error_fn = self.generic_check_kdc_error
            check_rep_fn = None
        else:
            check_error_fn = None
            check_rep_fn = self.generic_check_kdc_rep

            self.assertIsNone(expect_status)
            self.assertIsNone(expected_status)

        expect_edata = kdc_dict.pop('expect_edata', None)
        if expect_edata is not None:
            self.assertTrue(expected_error_mode)

        pac_options = kdc_dict.pop('pac_options', None)

        use_authenticator_subkey = kdc_dict.pop('use_authenticator_subkey', True)
        if use_authenticator_subkey:
            authenticator_subkey = self.RandomKey(Enctype.AES256)
        else:
            authenticator_subkey = None

        expected_proxy_target = service2_creds.get_spn()

        expected_transited_services = kdc_dict.pop(
            'expected_transited_services', [])

        transited_service = f'host/{service1_name}@{service1_realm}'
        expected_transited_services.append(transited_service)

        kdc_exchange_dict = self.tgs_exchange_dict(
            expected_crealm=client_realm,
            expected_cname=client_cname,
            expected_srealm=service2_realm,
            expected_sname=service2_sname,
            expected_account_name=client_username,
            expected_groups=expected_groups,
            unexpected_groups=unexpected_groups,
            expected_sid=sid,
            expected_supported_etypes=service2_etypes,
            ticket_decryption_key=service2_decryption_key,
            check_error_fn=check_error_fn,
            check_rep_fn=check_rep_fn,
            check_kdc_private_fn=self.generic_check_kdc_private,
            expected_error_mode=expected_error_mode,
            expect_status=expect_status,
            expected_status=expected_status,
            callback_dict={},
            tgt=service1_tgt,
            authenticator_subkey=authenticator_subkey,
            kdc_options=kdc_options,
            pac_options=pac_options,
            expect_edata=expect_edata,
            expected_proxy_target=expected_proxy_target,
            expected_transited_services=expected_transited_services,
            expect_pac=expect_pac)

        EncAuthorizationData = kdc_dict.pop('enc-authorization-data', None)

        if EncAuthorizationData is not None:
            if authenticator_subkey is not None:
                EncAuthorizationData_key = authenticator_subkey
                EncAuthorizationData_usage = KU_TGS_REQ_AUTH_DAT_SUBKEY
            else:
                EncAuthorizationData_key = client_service_tkt.session_key
                EncAuthorizationData_usage = KU_TGS_REQ_AUTH_DAT_SESSION
        else:
            EncAuthorizationData_key = None
            EncAuthorizationData_usage = None

        self._generic_kdc_exchange(kdc_exchange_dict,
                                   cname=None,
                                   realm=service2_realm,
                                   sname=service2_sname,
                                   etypes=etypes,
                                   additional_tickets=additional_tickets,
                                   EncAuthorizationData=EncAuthorizationData,
                                   EncAuthorizationData_key=EncAuthorizationData_key,
                                   EncAuthorizationData_usage=EncAuthorizationData_usage)

        if not expected_error_mode:
            # Check whether the ticket contains a PAC.
            ticket = kdc_exchange_dict['rep_ticket_creds']
            self.assertElementEqual(ticket.ticket_private, 'authtime', expected_authtime)
            pac = self.get_ticket_pac(ticket, expect_pac=expect_pac)
            ticket_auth_data = ticket.ticket_private.get('authorization-data')
            expected_num_ticket_auth_data = 0
            if expect_pac:
                self.assertIsNotNone(pac)
                expected_num_ticket_auth_data += 1
            else:
                self.assertIsNone(pac)

            if EncAuthorizationData is not None:
                expected_num_ticket_auth_data += len(EncAuthorizationData)

            if expected_num_ticket_auth_data == 0:
                self.assertIsNone(ticket_auth_data)
            else:
                self.assertIsNotNone(ticket_auth_data)
                self.assertEqual(len(ticket_auth_data),
                        expected_num_ticket_auth_data)

                if EncAuthorizationData is not None:
                    enc_ad_plain = self.der_encode(
                        EncAuthorizationData,
                        asn1Spec=krb5_asn1.AuthorizationData())
                    req_EncAuthorizationData = self.der_decode(
                        enc_ad_plain,
                        asn1Spec=krb5_asn1.AuthorizationData())

                    rep_EncAuthorizationData = ticket_auth_data.copy()
                    if expect_pac:
                        rep_EncAuthorizationData.pop(0)
                    self.assertEqual(rep_EncAuthorizationData, req_EncAuthorizationData)

        # Ensure we used all the parameters given to us.
        self.assertEqual({}, kdc_dict)

    def skip_unless_fl2008(self):
        samdb = self.get_samdb()
        functional_level = self.get_domain_functional_level(samdb)

        if functional_level < dsdb.DS_DOMAIN_FUNCTION_2008:
            self.skipTest('RBCD requires FL2008')

    def test_constrained_delegation(self):
        # Test constrained delegation.
        self._run_delegation_test(
            {
                'expected_error_mode': 0,
                'allow_delegation': True
            })

    def test_constrained_delegation_authtime(self):
        # Test constrained delegation.
        self._run_delegation_test(
            {
                'expected_error_mode': 0,
                'allow_delegation': True,
                'authtime_delay': 2,
            })

    def test_constrained_delegation_with_enc_auth_data_subkey(self):
        # Test constrained delegation.
        EncAuthorizationData = []
        relevant_elems = []
        auth_data777 = self.AuthorizationData_create(777, b'AuthorizationData777')
        relevant_elems.append(auth_data777)
        auth_data999 = self.AuthorizationData_create(999, b'AuthorizationData999')
        relevant_elems.append(auth_data999)
        ad_relevant = self.der_encode(relevant_elems, asn1Spec=krb5_asn1.AD_IF_RELEVANT())
        ad_data = self.AuthorizationData_create(AD_IF_RELEVANT, ad_relevant)
        EncAuthorizationData.append(ad_data)
        self._run_delegation_test(
            {
                'expected_error_mode': 0,
                'allow_delegation': True,
                'enc-authorization-data': EncAuthorizationData,
            })

    def test_constrained_delegation_with_enc_auth_data_no_subkey(self):
        # Test constrained delegation.
        EncAuthorizationData = []
        relevant_elems = []
        auth_data777 = self.AuthorizationData_create(777, b'AuthorizationData777')
        relevant_elems.append(auth_data777)
        auth_data999 = self.AuthorizationData_create(999, b'AuthorizationData999')
        relevant_elems.append(auth_data999)
        ad_relevant = self.der_encode(relevant_elems, asn1Spec=krb5_asn1.AD_IF_RELEVANT())
        ad_data = self.AuthorizationData_create(AD_IF_RELEVANT, ad_relevant)
        EncAuthorizationData.append(ad_data)
        self._run_delegation_test(
            {
                'expected_error_mode': 0,
                'allow_delegation': True,
                'enc-authorization-data': EncAuthorizationData,
                'use_authenticator_subkey': False,
            })

    def test_constrained_delegation_authentication_asserted_identity(self):
        # Test constrained delegation and check asserted identity is the
        # authentication authority. Note that we should always find this
        # SID for all the requests. Just S4U2Self will have a different SID.
        self._run_delegation_test(
            {
                'expected_error_mode': 0,
                'allow_delegation': True,
                'expected_groups': {
                    (security.SID_AUTHENTICATION_AUTHORITY_ASSERTED_IDENTITY,
                     SidType.EXTRA_SID,
                     self.default_attrs),
                    ...
                },
                'unexpected_groups': {
                    security.SID_SERVICE_ASSERTED_IDENTITY,
                },
            })

    def test_constrained_delegation_service_asserted_identity(self):
        # Test constrained delegation and check asserted identity is the
        # service sid is there. This is a S4U2Proxy + S4U2Self test.
        self._run_delegation_test(
            {
                'expected_error_mode': 0,
                'allow_delegation': True,
                's4u2self': True,
                'service1_opts': {
                    'trusted_to_auth_for_delegation': True,
                },
                'expected_groups': {
                    (security.SID_SERVICE_ASSERTED_IDENTITY,
                     SidType.EXTRA_SID,
                     self.default_attrs),
                    ...
                },
                'unexpected_groups': {
                    security.SID_AUTHENTICATION_AUTHORITY_ASSERTED_IDENTITY,
                },
            })

    def test_constrained_delegation_no_auth_data_required(self):
        # Test constrained delegation.
        self._run_delegation_test(
            {
                'expected_error_mode': 0,
                'allow_delegation': True,
                'service2_opts': {
                    'no_auth_data_required': True
                },
                'expect_pac': False
            })

    def test_constrained_delegation_existing_delegation_info(self):
        # Test constrained delegation with an existing S4U_DELEGATION_INFO
        # structure in the PAC.

        services = ['service1', 'service2', 'service3']

        self._run_delegation_test(
            {
                'expected_error_mode': 0,
                'allow_delegation': True,
                'modify_client_tkt_fn': functools.partial(
                    self.add_delegation_info, services=services),
                'expected_transited_services': services
            })

    def test_constrained_delegation_not_allowed(self):
        # Test constrained delegation when the delegating service does not
        # allow it.
        self._run_delegation_test(
            {
                'expected_error_mode': KDC_ERR_BADOPTION,
                # We aren’t particular about whether or not we get an NTSTATUS.
                'expect_status': None,
                'expected_status': ntstatus.NT_STATUS_NOT_SUPPORTED,
                'allow_delegation': False
            })

    def test_constrained_delegation_no_client_pac(self):
        # Test constrained delegation when the client service ticket does not
        # contain a PAC.
        self._run_delegation_test(
            {
                'expected_error_mode': (KDC_ERR_MODIFIED,
                                        KDC_ERR_TGT_REVOKED),
                'allow_delegation': True,
                'modify_client_tkt_fn': self.remove_ticket_pac,
                'expect_edata': False
            })

    def test_constrained_delegation_no_service_pac(self):
        # Test constrained delegation when the service TGT does not contain a
        # PAC.
        self._run_delegation_test(
            {
                'expected_error_mode': KDC_ERR_TGT_REVOKED,
                'allow_delegation': True,
                'modify_service_tgt_fn': self.remove_ticket_pac,
                'expect_edata': False
            })

    def test_constrained_delegation_no_client_pac_no_auth_data_required(self):
        # Test constrained delegation when the client service ticket does not
        # contain a PAC.
        self._run_delegation_test(
            {
                'expected_error_mode': (KDC_ERR_MODIFIED,
                                        KDC_ERR_BADOPTION,
                                        KDC_ERR_TGT_REVOKED),
                'allow_delegation': True,
                'modify_client_tkt_fn': self.remove_ticket_pac,
                'expect_edata': False,
                'service2_opts': {
                    'no_auth_data_required': True
                }
            })

    def test_constrained_delegation_no_service_pac_no_auth_data_required(self):
        # Test constrained delegation when the service TGT does not contain a
        # PAC.
        self._run_delegation_test(
            {
                'expected_error_mode': KDC_ERR_TGT_REVOKED,
                'allow_delegation': True,
                'modify_service_tgt_fn': self.remove_ticket_pac,
                'service2_opts': {
                    'no_auth_data_required': True
                },
                'expect_pac': False,
                'expect_edata': False
            })

    def test_constrained_delegation_non_forwardable(self):
        # Test constrained delegation with a non-forwardable ticket.
        self._run_delegation_test(
            {
                'expected_error_mode': KDC_ERR_BADOPTION,
                # We aren’t particular about whether or not we get an NTSTATUS.
                'expect_status': None,
                'expected_status': ntstatus.NT_STATUS_ACCOUNT_RESTRICTION,
                'allow_delegation': True,
                'modify_client_tkt_fn': functools.partial(
                    self.set_ticket_forwardable, flag=False)
            })

    def test_constrained_delegation_pac_options_rbcd(self):
        # Test constrained delegation, but with the RBCD bit set in the PAC
        # options.
        self._run_delegation_test(
            {
                'expected_error_mode': 0,
                'pac_options': '0001',  # supports RBCD
                'allow_delegation': True
            })

    def test_rbcd(self):
        # Test resource-based constrained delegation.
        self.skip_unless_fl2008()

        self._run_delegation_test(
            {
                'expected_error_mode': 0,
                'allow_rbcd': True,
                'pac_options': '0001',  # supports RBCD
            })

    def test_rbcd_no_auth_data_required(self):
        self.skip_unless_fl2008()

        self._run_delegation_test(
            {
                'expected_error_mode': 0,
                'allow_rbcd': True,
                'pac_options': '0001',  # supports RBCD
                'service2_opts': {
                    'no_auth_data_required': True
                },
                'expect_pac': False
            })

    def test_rbcd_existing_delegation_info(self):
        self.skip_unless_fl2008()

        # Test constrained delegation with an existing S4U_DELEGATION_INFO
        # structure in the PAC.

        services = ['service1', 'service2', 'service3']

        self._run_delegation_test(
            {
                'expected_error_mode': 0,
                'allow_rbcd': True,
                'pac_options': '0001',  # supports RBCD
                'modify_client_tkt_fn': functools.partial(
                    self.add_delegation_info, services=services),
                'expected_transited_services': services
            })

    def test_rbcd_not_allowed(self):
        # Test resource-based constrained delegation when the target service
        # does not allow it.
        self._run_delegation_test(
            {
                'expected_error_mode': KDC_ERR_BADOPTION,
                # We aren’t particular about whether or not we get an NTSTATUS.
                'expect_status': None,
                'expected_status': ntstatus.NT_STATUS_NOT_FOUND,
                'allow_rbcd': False,
                'pac_options': '0001'  # supports RBCD
            })

    def test_rbcd_no_client_pac_a(self):
        self.skip_unless_fl2008()

        # Test constrained delegation when the client service ticket does not
        # contain a PAC, and an empty msDS-AllowedToDelegateTo attribute.
        self._run_delegation_test(
            {
                'expected_error_mode': (KDC_ERR_MODIFIED,
                                        KDC_ERR_TGT_REVOKED),
                # We aren’t particular about whether or not we get an NTSTATUS.
                'expect_status': None,
                'expected_status': ntstatus.NT_STATUS_NOT_SUPPORTED,
                'allow_rbcd': True,
                'pac_options': '0001',  # supports RBCD
                'modify_client_tkt_fn': self.remove_ticket_pac
            })

    def test_rbcd_no_client_pac_b(self):
        self.skip_unless_fl2008()

        # Test constrained delegation when the client service ticket does not
        # contain a PAC, and a non-empty msDS-AllowedToDelegateTo attribute.
        self._run_delegation_test(
            {
                'expected_error_mode': (KDC_ERR_MODIFIED,
                                        KDC_ERR_TGT_REVOKED),
                # We aren’t particular about whether or not we get an NTSTATUS.
                'expect_status': None,
                'expected_status': ntstatus.NT_STATUS_NO_MATCH,
                'allow_rbcd': True,
                'pac_options': '0001',  # supports RBCD
                'modify_client_tkt_fn': self.remove_ticket_pac,
                'service1_opts': {
                    'delegation_to_spn': ('host/test')
                }
            })

    def test_rbcd_no_service_pac(self):
        self.skip_unless_fl2008()

        # Test constrained delegation when the service TGT does not contain a
        # PAC.
        self._run_delegation_test(
            {
                'expected_error_mode': KDC_ERR_TGT_REVOKED,
                'allow_rbcd': True,
                'pac_options': '0001',  # supports RBCD
                'modify_service_tgt_fn': self.remove_ticket_pac,
                'expect_edata': False
            })

    def test_rbcd_no_client_pac_no_auth_data_required_a(self):
        self.skip_unless_fl2008()

        # Test constrained delegation when the client service ticket does not
        # contain a PAC, and an empty msDS-AllowedToDelegateTo attribute.
        self._run_delegation_test(
            {
                'expected_error_mode': (KDC_ERR_MODIFIED,
                                        KDC_ERR_TGT_REVOKED),
                # We aren’t particular about whether or not we get an NTSTATUS.
                'expect_status': None,
                'expected_status': ntstatus.NT_STATUS_NOT_SUPPORTED,
                'allow_rbcd': True,
                'pac_options': '0001',  # supports RBCD
                'modify_client_tkt_fn': self.remove_ticket_pac,
                'service2_opts': {
                    'no_auth_data_required': True
                }
            })

    def test_rbcd_no_client_pac_no_auth_data_required_b(self):
        self.skip_unless_fl2008()

        # Test constrained delegation when the client service ticket does not
        # contain a PAC, and a non-empty msDS-AllowedToDelegateTo attribute.
        self._run_delegation_test(
            {
                'expected_error_mode': (KDC_ERR_MODIFIED,
                                        KDC_ERR_TGT_REVOKED),
                # We aren’t particular about whether or not we get an NTSTATUS.
                'expect_status': None,
                'expected_status': ntstatus.NT_STATUS_NO_MATCH,
                'allow_rbcd': True,
                'pac_options': '0001',  # supports RBCD
                'modify_client_tkt_fn': self.remove_ticket_pac,
                'service1_opts': {
                    'delegation_to_spn': ('host/test')
                },
                'service2_opts': {
                    'no_auth_data_required': True
                }
            })

    def test_rbcd_no_service_pac_no_auth_data_required(self):
        self.skip_unless_fl2008()

        # Test constrained delegation when the service TGT does not contain a
        # PAC.
        self._run_delegation_test(
            {
                'expected_error_mode': KDC_ERR_TGT_REVOKED,
                'allow_rbcd': True,
                'pac_options': '0001',  # supports RBCD
                'modify_service_tgt_fn': self.remove_ticket_pac,
                'service2_opts': {
                    'no_auth_data_required': True
                },
                'expect_edata': False
            })

    def test_rbcd_non_forwardable(self):
        self.skip_unless_fl2008()

        # Test resource-based constrained delegation with a non-forwardable
        # ticket.
        self._run_delegation_test(
            {
                'expected_error_mode': KDC_ERR_BADOPTION,
                # We aren’t particular about whether or not we get an NTSTATUS.
                'expect_status': None,
                'expected_status': ntstatus.NT_STATUS_ACCOUNT_RESTRICTION,
                'allow_rbcd': True,
                'pac_options': '0001',  # supports RBCD
                'modify_client_tkt_fn': functools.partial(
                    self.set_ticket_forwardable, flag=False)
            })

    def test_rbcd_no_pac_options_a(self):
        self.skip_unless_fl2008()

        # Test resource-based constrained delegation without the RBCD bit set
        # in the PAC options, and an empty msDS-AllowedToDelegateTo attribute.
        self._run_delegation_test(
            {
                'expected_error_mode': KDC_ERR_BADOPTION,
                # We aren’t particular about whether or not we get an NTSTATUS.
                'expect_status': None,
                'expected_status': ntstatus.NT_STATUS_NOT_SUPPORTED,
                'allow_rbcd': True,
                'pac_options': '1'  # does not support RBCD
            })

    def test_rbcd_no_pac_options_b(self):
        self.skip_unless_fl2008()

        # Test resource-based constrained delegation without the RBCD bit set
        # in the PAC options, and a non-empty msDS-AllowedToDelegateTo
        # attribute.
        self._run_delegation_test(
            {
                'expected_error_mode': KDC_ERR_BADOPTION,
                # We aren’t particular about whether or not we get an NTSTATUS.
                'expect_status': None,
                'expected_status': ntstatus.NT_STATUS_NO_MATCH,
                'allow_rbcd': True,
                'pac_options': '1',  # does not support RBCD
                'service1_opts': {
                    'delegation_to_spn': ('host/test')
                }
            })

    def test_bronze_bit_constrained_delegation_old_checksum(self):
        # Attempt to modify the ticket without updating the PAC checksums.
        self._run_delegation_test(
            {
                'expected_error_mode': (KDC_ERR_MODIFIED,
                                        KDC_ERR_BAD_INTEGRITY),
                'allow_delegation': True,
                'client_tkt_options': '0',  # non-forwardable ticket
                'modify_client_tkt_fn': functools.partial(
                    self.set_ticket_forwardable,
                    flag=True, update_pac_checksums=False),
                'expect_edata': False
            })

    def test_bronze_bit_rbcd_old_checksum(self):
        self.skip_unless_fl2008()

        # Attempt to modify the ticket without updating the PAC checksums.
        self._run_delegation_test(
            {
                'expected_error_mode': (KDC_ERR_MODIFIED,
                                        KDC_ERR_BAD_INTEGRITY),
                # We aren’t particular about whether or not we get an NTSTATUS.
                'expect_status': None,
                'expected_status': ntstatus.NT_STATUS_NOT_SUPPORTED,
                'allow_rbcd': True,
                'pac_options': '0001',  # supports RBCD
                'client_tkt_options': '0',  # non-forwardable ticket
                'modify_client_tkt_fn': functools.partial(
                    self.set_ticket_forwardable,
                    flag=True, update_pac_checksums=False)
            })

    def test_constrained_delegation_missing_client_checksum(self):
        # Present a user ticket without the required checksums.
        for checksum in self.pac_checksum_types:
            with self.subTest(checksum=checksum):
                if checksum == krb5pac.PAC_TYPE_TICKET_CHECKSUM:
                    expected_error_mode = (KDC_ERR_MODIFIED,
                                           KDC_ERR_BADOPTION)
                else:
                    expected_error_mode = KDC_ERR_GENERIC

                self._run_delegation_test(
                    {
                        'expected_error_mode': expected_error_mode,
                        'allow_delegation': True,
                        'modify_client_tkt_fn': functools.partial(
                            self.remove_pac_checksum, checksum=checksum),
                        'expect_edata': False
                    })

    def test_constrained_delegation_missing_service_checksum(self):
        # Present the service's ticket without the required checksums.
        for checksum in (krb5pac.PAC_TYPE_SRV_CHECKSUM,
                         krb5pac.PAC_TYPE_KDC_CHECKSUM):
            with self.subTest(checksum=checksum):
                self._run_delegation_test(
                    {
                        'expected_error_mode': KDC_ERR_GENERIC,
                        # We aren’t particular about whether or not we get an
                        # NTSTATUS.
                        'expect_status': None,
                        'expected_status':
                            ntstatus.NT_STATUS_INSUFFICIENT_RESOURCES,
                        'allow_delegation': True,
                        'modify_service_tgt_fn': functools.partial(
                            self.remove_pac_checksum, checksum=checksum)
                    })

    def test_rbcd_missing_client_checksum(self):
        self.skip_unless_fl2008()

        # Present a user ticket without the required checksums.
        for checksum in self.pac_checksum_types:
            with self.subTest(checksum=checksum):
                if checksum == krb5pac.PAC_TYPE_TICKET_CHECKSUM:
                    expected_error_mode = (KDC_ERR_MODIFIED,
                                           KDC_ERR_BADOPTION)
                else:
                    expected_error_mode = KDC_ERR_GENERIC

                self._run_delegation_test(
                    {
                        'expected_error_mode': expected_error_mode,
                        # We aren’t particular about whether or not we get an
                        # NTSTATUS.
                        'expect_status': None,
                        'expected_status':
                            ntstatus.NT_STATUS_NOT_SUPPORTED,
                        'allow_rbcd': True,
                        'pac_options': '0001',  # supports RBCD
                        'modify_client_tkt_fn': functools.partial(
                            self.remove_pac_checksum, checksum=checksum)
                    })

    def test_rbcd_missing_service_checksum(self):
        self.skip_unless_fl2008()

        # Present the service's ticket without the required checksums.
        for checksum in (krb5pac.PAC_TYPE_SRV_CHECKSUM,
                         krb5pac.PAC_TYPE_KDC_CHECKSUM):
            with self.subTest(checksum=checksum):
                self._run_delegation_test(
                    {
                        'expected_error_mode': KDC_ERR_GENERIC,
                        # We aren’t particular about whether or not we get an
                        # NTSTATUS.
                        'expect_status': None,
                        'expected_status':
                            ntstatus.NT_STATUS_INSUFFICIENT_RESOURCES,
                        'allow_rbcd': True,
                        'pac_options': '0001',  # supports RBCD
                        'modify_service_tgt_fn': functools.partial(
                            self.remove_pac_checksum, checksum=checksum)
                    })

    def test_constrained_delegation_zeroed_client_checksum(self):
        # Present a user ticket with invalid checksums.
        for checksum in self.pac_checksum_types:
            with self.subTest(checksum=checksum):
                self._run_delegation_test(
                    {
                        'expected_error_mode': (KDC_ERR_MODIFIED,
                                                KDC_ERR_BAD_INTEGRITY),
                        'allow_delegation': True,
                        'modify_client_tkt_fn': functools.partial(
                            self.zeroed_pac_checksum, checksum=checksum),
                        'expect_edata': False
                    })

    def test_constrained_delegation_zeroed_service_checksum(self):
        # Present the service's ticket with invalid checksums.
        for checksum in self.pac_checksum_types:
            with self.subTest(checksum=checksum):
                if checksum == krb5pac.PAC_TYPE_SRV_CHECKSUM:
                    expected_error_mode = (KDC_ERR_MODIFIED,
                                           KDC_ERR_BAD_INTEGRITY)
                    # We aren’t particular about whether or not we get an
                    # NTSTATUS.
                    expect_status = None
                    expected_status = ntstatus.NT_STATUS_WRONG_PASSWORD
                else:
                    expected_error_mode = 0
                    expect_status = None
                    expected_status = None

                self._run_delegation_test(
                    {
                        'expected_error_mode': expected_error_mode,
                        'expect_status': expect_status,
                        'expected_status': expected_status,
                        'allow_delegation': True,
                        'modify_service_tgt_fn': functools.partial(
                            self.zeroed_pac_checksum, checksum=checksum)
                    })

    def test_rbcd_zeroed_client_checksum(self):
        self.skip_unless_fl2008()

        # Present a user ticket with invalid checksums.
        for checksum in self.pac_checksum_types:
            with self.subTest(checksum=checksum):
                self._run_delegation_test(
                    {
                        'expected_error_mode': (KDC_ERR_MODIFIED,
                                                KDC_ERR_BAD_INTEGRITY),
                        # We aren’t particular about whether or not we get an
                        # NTSTATUS.
                        'expect_status': None,
                        'expected_status':
                            ntstatus.NT_STATUS_NOT_SUPPORTED,
                        'allow_rbcd': True,
                        'pac_options': '0001',  # supports RBCD
                        'modify_client_tkt_fn': functools.partial(
                            self.zeroed_pac_checksum, checksum=checksum)
                    })

    def test_rbcd_zeroed_service_checksum(self):
        self.skip_unless_fl2008()

        # Present the service's ticket with invalid checksums.
        for checksum in self.pac_checksum_types:
            with self.subTest(checksum=checksum):
                if checksum == krb5pac.PAC_TYPE_SRV_CHECKSUM:
                    expected_error_mode = (KDC_ERR_MODIFIED,
                                           KDC_ERR_BAD_INTEGRITY)
                    # We aren’t particular about whether or not we get an
                    # NTSTATUS.
                    expect_status = None
                    expected_status = ntstatus.NT_STATUS_WRONG_PASSWORD
                else:
                    expected_error_mode = 0
                    expect_status = None
                    expected_status = None

                self._run_delegation_test(
                    {
                        'expected_error_mode': expected_error_mode,
                        'expect_status': expect_status,
                        'expected_status': expected_status,
                        'allow_rbcd': True,
                        'pac_options': '0001',  # supports RBCD
                        'modify_service_tgt_fn': functools.partial(
                            self.zeroed_pac_checksum, checksum=checksum)
                    })

    unkeyed_ctypes = {Cksumtype.MD5, Cksumtype.SHA1, Cksumtype.CRC32}

    def test_constrained_delegation_unkeyed_client_checksum(self):
        # Present a user ticket with invalid checksums.
        for checksum in self.pac_checksum_types:
            for ctype in self.unkeyed_ctypes:
                with self.subTest(checksum=checksum, ctype=ctype):
                    if (checksum == krb5pac.PAC_TYPE_SRV_CHECKSUM
                            and ctype == Cksumtype.SHA1):
                        expected_error_mode = (KDC_ERR_SUMTYPE_NOSUPP,
                                               KDC_ERR_INAPP_CKSUM)
                    else:
                        expected_error_mode = (KDC_ERR_GENERIC,
                                               KDC_ERR_INAPP_CKSUM)

                    self._run_delegation_test(
                        {
                            'expected_error_mode': expected_error_mode,
                            'allow_delegation': True,
                            'modify_client_tkt_fn': functools.partial(
                                self.unkeyed_pac_checksum,
                                checksum=checksum, ctype=ctype),
                            'expect_edata': False
                        })

    def test_constrained_delegation_unkeyed_service_checksum(self):
        # Present the service's ticket with invalid checksums.
        for checksum in self.pac_checksum_types:
            for ctype in self.unkeyed_ctypes:
                with self.subTest(checksum=checksum, ctype=ctype):
                    if checksum == krb5pac.PAC_TYPE_SRV_CHECKSUM:
                        # We aren’t particular about whether or not we get an
                        # NTSTATUS.
                        expect_status = None
                        if ctype == Cksumtype.SHA1:
                            expected_error_mode = (KDC_ERR_SUMTYPE_NOSUPP,
                                                   KDC_ERR_INAPP_CKSUM)
                            expected_status = ntstatus.NT_STATUS_LOGON_FAILURE
                        else:
                            expected_error_mode = (KDC_ERR_GENERIC,
                                                   KDC_ERR_INAPP_CKSUM)
                            expected_status = (
                                ntstatus.NT_STATUS_INSUFFICIENT_RESOURCES)
                    else:
                        expected_error_mode = 0
                        expect_status = None
                        expected_status = None

                    self._run_delegation_test(
                        {
                            'expected_error_mode': expected_error_mode,
                            'expect_status': expect_status,
                            'expected_status': expected_status,
                            'allow_delegation': True,
                            'modify_service_tgt_fn': functools.partial(
                                self.unkeyed_pac_checksum,
                                checksum=checksum, ctype=ctype)
                        })

    def test_rbcd_unkeyed_client_checksum(self):
        self.skip_unless_fl2008()

        # Present a user ticket with invalid checksums.
        for checksum in self.pac_checksum_types:
            for ctype in self.unkeyed_ctypes:
                with self.subTest(checksum=checksum, ctype=ctype):
                    if (checksum == krb5pac.PAC_TYPE_SRV_CHECKSUM
                            and ctype == Cksumtype.SHA1):
                        expected_error_mode = (KDC_ERR_SUMTYPE_NOSUPP,
                                               KDC_ERR_INAPP_CKSUM)
                    else:
                        expected_error_mode = (KDC_ERR_GENERIC,
                                               KDC_ERR_INAPP_CKSUM)

                    self._run_delegation_test(
                        {
                            'expected_error_mode': expected_error_mode,
                            # We aren’t particular about whether or not we get
                            # an NTSTATUS.
                            'expect_status': None,
                            'expected_status':
                                ntstatus.NT_STATUS_NOT_SUPPORTED,
                            'allow_rbcd': True,
                            'pac_options': '0001',  # supports RBCD
                            'modify_client_tkt_fn': functools.partial(
                                self.unkeyed_pac_checksum,
                                checksum=checksum, ctype=ctype)
                        })

    def test_rbcd_unkeyed_service_checksum(self):
        self.skip_unless_fl2008()

        # Present the service's ticket with invalid checksums.
        for checksum in self.pac_checksum_types:
            for ctype in self.unkeyed_ctypes:
                with self.subTest(checksum=checksum, ctype=ctype):
                    if checksum == krb5pac.PAC_TYPE_SRV_CHECKSUM:
                        # We aren’t particular about whether or not we get an
                        # NTSTATUS.
                        expect_status = None
                        if ctype == Cksumtype.SHA1:
                            expected_error_mode = (KDC_ERR_SUMTYPE_NOSUPP,
                                                   KDC_ERR_INAPP_CKSUM)
                            expected_status = ntstatus.NT_STATUS_LOGON_FAILURE
                        else:
                            expected_error_mode = (KDC_ERR_GENERIC,
                                                   KDC_ERR_INAPP_CKSUM)
                            expected_status = (
                                ntstatus.NT_STATUS_INSUFFICIENT_RESOURCES)
                    else:
                        expected_error_mode = 0
                        expect_status = None
                        expected_status = None

                    self._run_delegation_test(
                        {
                            'expected_error_mode': expected_error_mode,
                            'expect_status': expect_status,
                            'expected_status': expected_status,
                            'allow_rbcd': True,
                            'pac_options': '0001',  # supports RBCD
                            'modify_service_tgt_fn': functools.partial(
                                self.unkeyed_pac_checksum,
                                checksum=checksum, ctype=ctype)
                        })

    def test_constrained_delegation_rc4_client_checksum(self):
        # Present a user ticket with RC4 checksums.
        samdb = self.get_samdb()
        functional_level = self.get_domain_functional_level(samdb)

        if functional_level >= dsdb.DS_DOMAIN_FUNCTION_2008:
            expected_error_mode = (KDC_ERR_GENERIC,
                                   KDC_ERR_INAPP_CKSUM)
            expect_edata = False
        else:
            expected_error_mode = 0
            expect_edata = None

        self._run_delegation_test(
            {
                'expected_error_mode': expected_error_mode,
                'allow_delegation': True,
                'modify_client_tkt_fn': self.rc4_pac_checksums,
                'expect_edata': expect_edata,
            })

    def test_rbcd_rc4_client_checksum(self):
        self.skip_unless_fl2008()

        # Present a user ticket with RC4 checksums.
        expected_error_mode = (KDC_ERR_GENERIC,
                               KDC_ERR_BADOPTION)

        self._run_delegation_test(
            {
                'expected_error_mode': expected_error_mode,
                # We aren’t particular about whether or not we get an NTSTATUS.
                'expect_status': None,
                'expected_status': ntstatus.NT_STATUS_NOT_SUPPORTED,
                'allow_rbcd': True,
                'pac_options': '0001',  # supports RBCD
                'modify_client_tkt_fn': self.rc4_pac_checksums,
            })

    def test_constrained_delegation_rodc_issued(self):
        self._run_delegation_test(
            {
                # Test that RODC-issued constrained delegation tickets are
                # accepted.
                'expected_error_mode': 0,
                'allow_delegation': True,
                # Both tickets must be signed by the same RODC.
                'modify_client_tkt_fn': self.signed_by_rodc,
                'modify_service_tgt_fn': self.issued_by_rodc,
                'client_opts': {
                    'allowed_replication_mock': True,
                    'revealed_to_mock_rodc': True,
                },
                'service1_opts': {
                    'allowed_replication_mock': True,
                    'revealed_to_mock_rodc': True,
                },
            })

    def test_rbcd_rodc_issued(self):
        self.skip_unless_fl2008()

        self._run_delegation_test(
            {
                # Test that RODC-issued constrained delegation tickets are
                # accepted.
                'expected_error_mode': 0,
                'allow_rbcd': True,
                'pac_options': '0001',  # supports RBCD
                # Both tickets must be signed by the same RODC.
                'modify_client_tkt_fn': self.signed_by_rodc,
                'modify_service_tgt_fn': self.issued_by_rodc,
                'client_opts': {
                    'allowed_replication_mock': True,
                    'revealed_to_mock_rodc': True,
                },
                'service1_opts': {
                    'allowed_replication_mock': True,
                    'revealed_to_mock_rodc': True,
                },
            })

    def remove_pac_checksum(self, ticket, checksum):
        checksum_keys = self.get_krbtgt_checksum_key()

        return self.modified_ticket(ticket,
                                    checksum_keys=checksum_keys,
                                    include_checksums={checksum: False})

    def zeroed_pac_checksum(self, ticket, checksum):
        krbtgt_creds = self.get_krbtgt_creds()
        krbtgt_key = self.TicketDecryptionKey_from_creds(krbtgt_creds)

        server_key = ticket.decryption_key

        checksum_keys = {
            krb5pac.PAC_TYPE_SRV_CHECKSUM: server_key,
            krb5pac.PAC_TYPE_KDC_CHECKSUM: krbtgt_key,
            krb5pac.PAC_TYPE_TICKET_CHECKSUM: krbtgt_key,
        }

        if checksum == krb5pac.PAC_TYPE_SRV_CHECKSUM:
            zeroed_key = server_key
        else:
            zeroed_key = krbtgt_key

        checksum_keys[checksum] = ZeroedChecksumKey(zeroed_key.key,
                                                    zeroed_key.kvno)

        return self.modified_ticket(ticket,
                                    checksum_keys=checksum_keys,
                                    include_checksums={checksum: True})

    def unkeyed_pac_checksum(self, ticket, checksum, ctype):
        krbtgt_creds = self.get_krbtgt_creds()
        krbtgt_key = self.TicketDecryptionKey_from_creds(krbtgt_creds)

        server_key = ticket.decryption_key

        checksum_keys = {
            krb5pac.PAC_TYPE_SRV_CHECKSUM: server_key,
            krb5pac.PAC_TYPE_KDC_CHECKSUM: krbtgt_key,
            krb5pac.PAC_TYPE_TICKET_CHECKSUM: krbtgt_key,
            krb5pac.PAC_TYPE_FULL_CHECKSUM: krbtgt_key,
        }

        # Make a copy of the existing key and change the ctype.
        key = checksum_keys[checksum]
        new_key = RodcPacEncryptionKey(key.key, key.kvno)
        new_key.ctype = ctype
        checksum_keys[checksum] = new_key

        return self.modified_ticket(ticket,
                                    checksum_keys=checksum_keys,
                                    include_checksums={checksum: True})

    def rc4_pac_checksums(self, ticket):
        krbtgt_creds = self.get_krbtgt_creds()
        rc4_krbtgt_key = self.TicketDecryptionKey_from_creds(
            krbtgt_creds, etype=Enctype.RC4)

        server_key = ticket.decryption_key

        checksum_keys = {
            krb5pac.PAC_TYPE_SRV_CHECKSUM: server_key,
            krb5pac.PAC_TYPE_KDC_CHECKSUM: rc4_krbtgt_key,
            krb5pac.PAC_TYPE_TICKET_CHECKSUM: rc4_krbtgt_key,
            krb5pac.PAC_TYPE_FULL_CHECKSUM: rc4_krbtgt_key,
        }

        include_checksums = {
            krb5pac.PAC_TYPE_SRV_CHECKSUM: True,
            krb5pac.PAC_TYPE_KDC_CHECKSUM: True,
            krb5pac.PAC_TYPE_TICKET_CHECKSUM: True,
            krb5pac.PAC_TYPE_FULL_CHECKSUM: True,
        }

        return self.modified_ticket(ticket,
                                    checksum_keys=checksum_keys,
                                    include_checksums=include_checksums)

    def add_delegation_info(self, ticket, services=None):
        def modify_pac_fn(pac):
            pac_buffers = pac.buffers
            self.assertNotIn(krb5pac.PAC_TYPE_CONSTRAINED_DELEGATION,
                             (buffer.type for buffer in pac_buffers))

            transited_services = list(map(lsa.String, services))

            delegation = krb5pac.PAC_CONSTRAINED_DELEGATION()
            delegation.proxy_target = lsa.String('test_proxy_target')
            delegation.transited_services = transited_services
            delegation.num_transited_services = len(transited_services)

            info = krb5pac.PAC_CONSTRAINED_DELEGATION_CTR()
            info.info = delegation

            pac_buffer = krb5pac.PAC_BUFFER()
            pac_buffer.type = krb5pac.PAC_TYPE_CONSTRAINED_DELEGATION
            pac_buffer.info = info

            pac_buffers.append(pac_buffer)

            pac.buffers = pac_buffers
            pac.num_buffers += 1

            return pac

        checksum_keys = self.get_krbtgt_checksum_key()

        return self.modified_ticket(ticket,
                                    checksum_keys=checksum_keys,
                                    modify_pac_fn=modify_pac_fn)

    def set_ticket_forwardable(self, ticket, flag, update_pac_checksums=True):
        modify_fn = functools.partial(self.modify_ticket_flag,
                                      flag='forwardable',
                                      value=flag)

        if update_pac_checksums:
            checksum_keys = self.get_krbtgt_checksum_key()
        else:
            checksum_keys = None

        return self.modified_ticket(ticket,
                                    modify_fn=modify_fn,
                                    checksum_keys=checksum_keys,
                                    update_pac_checksums=update_pac_checksums)

    def remove_ticket_pac(self, ticket):
        return self.modified_ticket(ticket,
                                    exclude_pac=True)


if __name__ == "__main__":
    global_asn1_print = False
    global_hexdump = False
    import unittest
    unittest.main()
