# Unix SMB/CIFS implementation.
# Copyright (C) Andrew Bartlett <abartlet@catalyst.net.nz>
#
# This program is free software; you can redistribute it and/or modify
# it under the terms of the GNU General Public License as published by
# the Free Software Foundation; either version 3 of the License, or
# (at your option) any later version.
#
# This program is distributed in the hope that it will be useful,
# but WITHOUT ANY WARRANTY; without even the implied warranty of
# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the
# GNU General Public License for more details.
#
# You should have received a copy of the GNU General Public License
# along with this program.  If not, see <http://www.gnu.org/licenses/>.
#

import os
import ldb
import re

from samba.auth import system_session
from samba.samdb import SamDB
from samba.ndr import ndr_unpack, ndr_pack
from samba.dcerpc import dnsp
from samba.tests.samba_tool.base import SambaToolCmdTest
import time
from samba import dsdb_dns


class DnsCmdTestCase(SambaToolCmdTest):
    def setUp(self):
        super(DnsCmdTestCase, self).setUp()

        self.dburl = "ldap://%s" % os.environ["SERVER"]
        self.creds_string = "-U%s%%%s" % (os.environ["DC_USERNAME"],
                                          os.environ["DC_PASSWORD"])

        self.samdb = self.getSamDB("-H", self.dburl, self.creds_string)
        self.config_dn = str(self.samdb.get_config_basedn())

        self.testip = "192.168.0.193"
        self.testip2 = "192.168.0.194"

        self.addCleanup(self.deleteZone)
        self.addZone()

        # Note: SOA types don't work (and shouldn't), as we only have one zone per DNS record.

        good_dns = ["SAMDOM.EXAMPLE.COM",
                    "1.EXAMPLE.COM",
                    "%sEXAMPLE.COM" % ("1." * 100),
                    "EXAMPLE",
                    "!@#$%^&*()_",
                    "HIGH\xFFBYTE",
                    "@.EXAMPLE.COM",
                    "."]
        bad_dns = ["...",
                   ".EXAMPLE.COM",
                   ".EXAMPLE.",
                   "",
                   "SAMDOM..EXAMPLE.COM"]

        good_mx = ["SAMDOM.EXAMPLE.COM 65530",
                   "SAMDOM.EXAMPLE.COM     0"]
        bad_mx = ["SAMDOM.EXAMPLE.COM -1",
                  "SAMDOM.EXAMPLE.COM",
                  " ",
                  "SAMDOM.EXAMPLE.COM 1 1",
                  "SAMDOM.EXAMPLE.COM SAMDOM.EXAMPLE.COM"]

        good_srv = ["SAMDOM.EXAMPLE.COM 65530 65530 65530",
                    "SAMDOM.EXAMPLE.COM     1     1     1"]
        bad_srv = ["SAMDOM.EXAMPLE.COM 0 65536 0",
                   "SAMDOM.EXAMPLE.COM 0 0 65536",
                   "SAMDOM.EXAMPLE.COM 65536 0 0"]

        for bad_dn in bad_dns:
            bad_mx.append("%s 1" % bad_dn)
            bad_srv.append("%s 0 0 0" % bad_dn)
        for good_dn in good_dns:
            good_mx.append("%s 1" % good_dn)
            good_srv.append("%s 0 0 0" % good_dn)

        self.good_records = {
                "A":["192.168.0.1", "255.255.255.255"],
                "AAAA":["1234:5678:9ABC:DEF0:0000:0000:0000:0000",
                        "0000:0000:0000:0000:0000:0000:0000:0000",
                        "1234:5678:9ABC:DEF0:1234:5678:9ABC:DEF0",
                        "1234:1234:1234::",
                        "1234:5678:9ABC:DEF0::",
                        "0000:0000::0000",
                        "1234::5678:9ABC:0000:0000:0000:0000",
                        "::1",
                        "::",
                        "1:1:1:1:1:1:1:1"],
                "PTR": good_dns,
                "CNAME": good_dns,
                "NS": good_dns,
                "MX": good_mx,
                "SRV": good_srv,
                "TXT": ["text", "", "@#!", "\n"]
        }

        self.bad_records = {
                "A":["192.168.0.500",
                     "255.255.255.255/32"],
                "AAAA":["GGGG:1234:5678:9ABC:0000:0000:0000:0000",
                        "0000:0000:0000:0000:0000:0000:0000:0000/1",
                        "AAAA:AAAA:AAAA:AAAA:G000:0000:0000:1234",
                        "1234:5678:9ABC:DEF0:1234:5678:9ABC:DEF0:1234",
                        "1234:5678:9ABC:DEF0:1234:5678:9ABC",
                        "1111::1111::1111"],
                "PTR": bad_dns,
                "CNAME": bad_dns,
                "NS": bad_dns,
                "MX": bad_mx,
                "SRV": bad_srv
        }

    def resetZone(self):
        self.deleteZone()
        self.addZone()

    def addZone(self):
        self.zone = "zone"
        result, out, err = self.runsubcmd("dns",
                                          "zonecreate",
                                          os.environ["SERVER"],
                                          self.zone,
                                          self.creds_string)
        self.assertCmdSuccess(result, out, err)

    def deleteZone(self):
        result, out, err = self.runsubcmd("dns",
                                          "zonedelete",
                                          os.environ["SERVER"],
                                          self.zone,
                                          self.creds_string)
        self.assertCmdSuccess(result, out, err)

    def get_all_records(self, zone_name):
        zone_dn = (f"DC={zone_name},CN=MicrosoftDNS,DC=DomainDNSZones,"
                   f"{self.samdb.get_default_basedn()}")

        expression = "(&(objectClass=dnsNode)(!(dNSTombstoned=TRUE)))"

        nodes = self.samdb.search(base=zone_dn, scope=ldb.SCOPE_SUBTREE,
                                  expression=expression,
                                  attrs=["dnsRecord", "name"])

        record_map = {}
        for node in nodes:
            name = node["name"][0].decode()
            record_map[name] = list(node["dnsRecord"])

        return record_map

    def get_record_from_db(self, zone_name, record_name):
        zones = self.samdb.search(base="DC=DomainDnsZones,%s"
                                  % self.samdb.get_default_basedn(),
                                  scope=ldb.SCOPE_SUBTREE,
                                  expression="(objectClass=dnsZone)",
                                  attrs=["cn"])

        for zone in zones:
            if zone_name in str(zone.dn):
                zone_dn = zone.dn
                break

        records = self.samdb.search(base=zone_dn, scope=ldb.SCOPE_SUBTREE,
                                    expression="(objectClass=dnsNode)",
                                    attrs=["dnsRecord"])

        for old_packed_record in records:
            if record_name in str(old_packed_record.dn):
                return (old_packed_record.dn,
                        ndr_unpack(dnsp.DnssrvRpcRecord,
                                   old_packed_record["dnsRecord"][0]))

    def test_rank_none(self):
        record_str = "192.168.50.50"
        record_type_str = "A"

        result, out, err = self.runsubcmd("dns", "add", os.environ["SERVER"],
                                          self.zone, "testrecord", record_type_str,
                                          record_str, self.creds_string)
        self.assertCmdSuccess(result, out, err,
                              "Failed to add record '%s' with type %s."
                              % (record_str, record_type_str))

        dn, record = self.get_record_from_db(self.zone, "testrecord")
        record.rank = 0  # DNS_RANK_NONE
        res = self.samdb.dns_replace_by_dn(dn, [record])
        if res is not None:
            self.fail("Unable to update dns record to have DNS_RANK_NONE.")

        errors = []

        # The record should still exist
        result, out, err = self.runsubcmd("dns", "query", os.environ["SERVER"],
                                          self.zone, "testrecord", record_type_str,
                                          self.creds_string)
        try:
            self.assertCmdSuccess(result, out, err,
                                  "Failed to query for a record"
                                  "which had DNS_RANK_NONE.")
            self.assertTrue("testrecord" in out and record_str in out,
                            "Query for a record which had DNS_RANK_NONE"
                            "succeeded but produced no resulting records.")
        except AssertionError as e:
            # Windows produces no resulting records
            pass

        # We should not be able to add a duplicate
        result, out, err = self.runsubcmd("dns", "add", os.environ["SERVER"],
                                          self.zone, "testrecord", record_type_str,
                                          record_str, self.creds_string)
        try:
            self.assertCmdFail(result, "Successfully added duplicate record"
                               "of one which had DNS_RANK_NONE.")
        except AssertionError as e:
            errors.append(e)

        # We should be able to delete it
        result, out, err = self.runsubcmd("dns", "delete", os.environ["SERVER"],
                                          self.zone, "testrecord", record_type_str,
                                          record_str, self.creds_string)
        try:
            self.assertCmdSuccess(result, out, err, "Failed to delete record"
                                  "which had DNS_RANK_NONE.")
        except AssertionError as e:
            errors.append(e)

        # Now the record should not exist
        result, out, err = self.runsubcmd("dns", "query", os.environ["SERVER"],
                                          self.zone, "testrecord",
                                          record_type_str, self.creds_string)
        try:
            self.assertCmdFail(result, "Successfully queried for deleted record"
                               "which had DNS_RANK_NONE.")
        except AssertionError as e:
            errors.append(e)

        if len(errors) > 0:
            err_str = "Failed appropriate behaviour with DNS_RANK_NONE:"
            for error in errors:
                err_str = err_str + "\n" + str(error)
            raise AssertionError(err_str)

    def test_accept_valid_commands(self):
        """
        For all good records, attempt to add, query and delete them.
        """
        num_failures = 0
        failure_msgs = []
        for dnstype in self.good_records:
            for record in self.good_records[dnstype]:
                try:
                    result, out, err = self.runsubcmd("dns", "add",
                                                      os.environ["SERVER"],
                                                      self.zone, "testrecord",
                                                      dnstype, record,
                                                      self.creds_string)
                    self.assertCmdSuccess(result, out, err, "Failed to add"
                                          "record %s with type %s."
                                          % (record, dnstype))

                    result, out, err = self.runsubcmd("dns", "query",
                                                      os.environ["SERVER"],
                                                      self.zone, "testrecord",
                                                      dnstype,
                                                      self.creds_string)
                    self.assertCmdSuccess(result, out, err, "Failed to query"
                                          "record %s with qualifier %s."
                                          % (record, dnstype))

                    result, out, err = self.runsubcmd("dns", "delete",
                                                      os.environ["SERVER"],
                                                      self.zone, "testrecord",
                                                      dnstype, record,
                                                      self.creds_string)
                    self.assertCmdSuccess(result, out, err, "Failed to remove"
                                          "record %s with type %s."
                                          % (record, dnstype))
                except AssertionError as e:
                    num_failures = num_failures + 1
                    failure_msgs.append(e)

        if num_failures > 0:
            for msg in failure_msgs:
                print(msg)
            self.fail("Failed to accept valid commands. %d total failures."
                      "Errors above." % num_failures)

    def test_reject_invalid_commands(self):
        """
        For all bad records, attempt to add them and update to them,
        making sure that both operations fail.
        """
        num_failures = 0
        failure_msgs = []

        # Add invalid records and make sure they fail to be added
        for dnstype in self.bad_records:
            for record in self.bad_records[dnstype]:
                try:
                    result, out, err = self.runsubcmd("dns", "add",
                                                      os.environ["SERVER"],
                                                      self.zone, "testrecord",
                                                      dnstype, record,
                                                      self.creds_string)
                    self.assertCmdFail(result, "Successfully added invalid"
                                       "record '%s' of type '%s'."
                                       % (record, dnstype))
                except AssertionError as e:
                    num_failures = num_failures + 1
                    failure_msgs.append(e)
                    self.resetZone()
                try:
                    result, out, err = self.runsubcmd("dns", "delete",
                                                      os.environ["SERVER"],
                                                      self.zone, "testrecord",
                                                      dnstype, record,
                                                      self.creds_string)
                    self.assertCmdFail(result, "Successfully deleted invalid"
                                       "record '%s' of type '%s' which"
                                       "shouldn't exist." % (record, dnstype))
                except AssertionError as e:
                    num_failures = num_failures + 1
                    failure_msgs.append(e)
                    self.resetZone()

        # Update valid records to invalid ones and make sure they
        # fail to be updated
        for dnstype in self.bad_records:
            for bad_record in self.bad_records[dnstype]:
                good_record = self.good_records[dnstype][0]

                try:
                    result, out, err = self.runsubcmd("dns", "add",
                                                      os.environ["SERVER"],
                                                      self.zone, "testrecord",
                                                      dnstype, good_record,
                                                      self.creds_string)
                    self.assertCmdSuccess(result, out, err, "Failed to add "
                                          "record '%s' with type %s."
                                          % (record, dnstype))

                    result, out, err = self.runsubcmd("dns", "update",
                                                      os.environ["SERVER"],
                                                      self.zone, "testrecord",
                                                      dnstype, good_record,
                                                      bad_record,
                                                      self.creds_string)
                    self.assertCmdFail(result, "Successfully updated valid "
                                       "record '%s' of type '%s' to invalid "
                                       "record '%s' of the same type."
                                       % (good_record, dnstype, bad_record))

                    result, out, err = self.runsubcmd("dns", "delete",
                                                      os.environ["SERVER"],
                                                      self.zone, "testrecord",
                                                      dnstype, good_record,
                                                      self.creds_string)
                    self.assertCmdSuccess(result, out, err, "Could not delete "
                                          "valid record '%s' of type '%s'."
                                          % (good_record, dnstype))
                except AssertionError as e:
                    num_failures = num_failures + 1
                    failure_msgs.append(e)
                    self.resetZone()

        if num_failures > 0:
            for msg in failure_msgs:
                print(msg)
            self.fail("Failed to reject invalid commands. %d total failures. "
                      "Errors above." % num_failures)

    def test_update_invalid_type(self):
        """Make sure that a record can't be updated to another type leaving
        the data the same, where that data would be incompatible with
        the new type. This is not always enforced at the C level.

        We don't try with all types, because many types are compatible
        in their representations (e.g. A records could be TXT or CNAME
        records; PTR record values are exactly the same as CNAME
        record values, etc).
        """
        dnstypes = ('A', 'AAAA', 'SRV')
        for dnstype1 in dnstypes:
            record1 = self.good_records[dnstype1][0]
            result, out, err = self.runsubcmd("dns", "add",
                                              os.environ["SERVER"],
                                              self.zone, "testrecord",
                                              dnstype1, record1,
                                              self.creds_string)
            self.assertCmdSuccess(result, out, err, "Failed to add "
                                  "record %s with type %s."
                                  % (record1, dnstype1))

            for dnstype2 in dnstypes:
                if dnstype1 == dnstype2:
                    continue

                record2 = self.good_records[dnstype2][0]

                # Check both ways: Give the current type and try to update,
                # and give the new type and try to update.
                result, out, err = self.runsubcmd("dns", "update",
                                                  os.environ["SERVER"],
                                                  self.zone, "testrecord",
                                                  dnstype1, record1,
                                                  record2, self.creds_string)
                self.assertCmdFail(result, "Successfully updated record '%s' "
                                   "to '%s', even though the latter is of "
                                   "type '%s' where '%s' was expected."
                                   % (record1, record2, dnstype2, dnstype1))

                result, out, err = self.runsubcmd("dns", "update",
                                                  os.environ["SERVER"],
                                                  self.zone, "testrecord",
                                                  dnstype2, record1, record2,
                                                  self.creds_string)
                self.assertCmdFail(result, "Successfully updated record "
                                   "'%s' to '%s', even though the former "
                                   "is of type '%s' where '%s' was expected."
                                   % (record1, record2, dnstype1, dnstype2))

    def test_update_valid_type(self):
        for dnstype in self.good_records:
            for record in self.good_records[dnstype]:
                result, out, err = self.runsubcmd("dns", "add",
                                                  os.environ["SERVER"],
                                                  self.zone, "testrecord",
                                                  dnstype, record,
                                                  self.creds_string)
                self.assertCmdSuccess(result, out, err, "Failed to add "
                                      "record %s with type %s."
                                      % (record, dnstype))

                if record == '.' and dnstype != 'TXT':
                    # This will fail because the update finds a match
                    # for "." that is actually "" (in
                    # dns_record_match()), then uses the "" record in
                    # a call to dns_to_dnsp_convert() which calls
                    # dns_name_check() which rejects "" as a bad DNS
                    # name. Maybe FIXME, maybe not.
                    continue

                # Update the record to be the same.
                result, out, err = self.runsubcmd("dns", "update",
                                                  os.environ["SERVER"],
                                                  self.zone, "testrecord",
                                                  dnstype, record, record,
                                                  self.creds_string)
                self.assertCmdSuccess(result, out, err,
                                      "Could not update record "
                                      "'%s' to be exactly the same." % record)

                result, out, err = self.runsubcmd("dns", "delete",
                                                  os.environ["SERVER"],
                                                  self.zone, "testrecord",
                                                  dnstype, record,
                                                  self.creds_string)
                self.assertCmdSuccess(result, out, err, "Could not delete "
                                      "valid record '%s' of type '%s'."
                                      % (record, dnstype))

        for record in self.good_records["SRV"]:
            result, out, err = self.runsubcmd("dns", "add",
                                              os.environ["SERVER"],
                                              self.zone, "testrecord",
                                              "SRV", record,
                                              self.creds_string)
            self.assertCmdSuccess(result, out, err, "Failed to add "
                                  "record %s with type 'SRV'." % record)

            split = record.split()
            new_bit = str(int(split[3]) + 1)
            new_record = '%s %s %s %s' % (split[0], split[1], split[2], new_bit)

            result, out, err = self.runsubcmd("dns", "update",
                                              os.environ["SERVER"],
                                              self.zone, "testrecord",
                                              "SRV", record,
                                              new_record, self.creds_string)
            self.assertCmdSuccess(result, out, err, "Failed to update record "
                                  "'%s' of type '%s' to '%s'."
                                  % (record, "SRV", new_record))

            result, out, err = self.runsubcmd("dns", "query",
                                              os.environ["SERVER"],
                                              self.zone, "testrecord",
                                              "SRV", self.creds_string)
            self.assertCmdSuccess(result, out, err, "Failed to query for "
                                  "record '%s' of type '%s'."
                                  % (new_record, "SRV"))

            result, out, err = self.runsubcmd("dns", "delete",
                                              os.environ["SERVER"],
                                              self.zone, "testrecord",
                                              "SRV", new_record,
                                              self.creds_string)
            self.assertCmdSuccess(result, out, err, "Could not delete "
                                  "valid record '%s' of type '%s'."
                                  % (new_record, "SRV"))

        # Since 'dns update' takes the current value as a parameter, make sure
        # we can't enter the wrong current value for a given record.
        for dnstype in self.good_records:
            if len(self.good_records[dnstype]) < 3:
                continue  # Not enough records of this type to do this test

            used_record = self.good_records[dnstype][0]
            unused_record = self.good_records[dnstype][1]
            new_record = self.good_records[dnstype][2]

            result, out, err = self.runsubcmd("dns", "add",
                                              os.environ["SERVER"],
                                              self.zone, "testrecord",
                                              dnstype, used_record,
                                              self.creds_string)
            self.assertCmdSuccess(result, out, err, "Failed to add record %s "
                                  "with type %s." % (used_record, dnstype))

            result, out, err = self.runsubcmd("dns", "update",
                                              os.environ["SERVER"],
                                              self.zone, "testrecord",
                                              dnstype, unused_record,
                                              new_record,
                                              self.creds_string)
            self.assertCmdFail(result, "Successfully updated record '%s' "
                               "from '%s' to '%s', even though the given "
                               "source record is incorrect."
                               % (used_record, unused_record, new_record))

    def test_invalid_types(self):
        result, out, err = self.runsubcmd("dns", "add",
                                          os.environ["SERVER"],
                                          self.zone, "testrecord",
                                          "SOA", "test",
                                          self.creds_string)
        self.assertCmdFail(result, "Successfully added record of type SOA, "
                           "when this type should not be available.")
        self.assertTrue("type SOA is not supported" in err,
                        "Invalid error message '%s' when attempting to "
                        "add record of type SOA." % err)

    def test_add_overlapping_different_type(self):
        """
        Make sure that we can add an entry with the same name as an existing one but a different type.
        """

        i = 0
        for dnstype1 in self.good_records:
            record1 = self.good_records[dnstype1][0]
            for dnstype2 in self.good_records:
                # Only do some subset of dns types, otherwise it takes a long time.
                i += 1
                if i % 4 != 0:
                    continue

                if dnstype1 == dnstype2:
                    continue

                record2 = self.good_records[dnstype2][0]

                result, out, err = self.runsubcmd("dns", "add",
                                                  os.environ["SERVER"],
                                                  self.zone, "testrecord",
                                                  dnstype1, record1,
                                                  self.creds_string)
                self.assertCmdSuccess(result, out, err, "Failed to add record "
                                      "'%s' of type '%s'." % (record1, dnstype1))

                result, out, err = self.runsubcmd("dns", "add",
                                                  os.environ["SERVER"],
                                                  self.zone, "testrecord",
                                                  dnstype2, record2,
                                                  self.creds_string)
                self.assertCmdSuccess(result, out, err, "Failed to add record "
                                      "'%s' of type '%s' when a record '%s' "
                                      "of type '%s' with the same name exists."
                                      % (record1, dnstype1, record2, dnstype2))

                result, out, err = self.runsubcmd("dns", "query",
                                                  os.environ["SERVER"],
                                                  self.zone, "testrecord",
                                                  dnstype1, self.creds_string)
                self.assertCmdSuccess(result, out, err, "Failed to query for "
                                      "record '%s' of type '%s' when a new "
                                      "record '%s' of type '%s' with the same "
                                      "name was added."
                                      % (record1, dnstype1, record2, dnstype2))

                result, out, err = self.runsubcmd("dns", "query",
                                                  os.environ["SERVER"],
                                                  self.zone, "testrecord",
                                                  dnstype2, self.creds_string)
                self.assertCmdSuccess(result, out, err, "Failed to query "
                                      "record '%s' of type '%s' which should "
                                      "have been added with the same name as "
                                      "record '%s' of type '%s'."
                                      % (record2, dnstype2, record1, dnstype1))

                result, out, err = self.runsubcmd("dns", "delete",
                                                  os.environ["SERVER"],
                                                  self.zone, "testrecord",
                                                  dnstype1, record1,
                                                  self.creds_string)
                self.assertCmdSuccess(result, out, err, "Failed to delete "
                                      "record '%s' of type '%s'."
                                      % (record1, dnstype1))

                result, out, err = self.runsubcmd("dns", "delete",
                                                  os.environ["SERVER"],
                                                  self.zone, "testrecord",
                                                  dnstype2, record2,
                                                  self.creds_string)
                self.assertCmdSuccess(result, out, err, "Failed to delete "
                                      "record '%s' of type '%s'."
                                      % (record2, dnstype2))

    def test_query_deleted_record(self):
        self.runsubcmd("dns", "add", os.environ["SERVER"], self.zone,
                       "testrecord", "A", self.testip, self.creds_string)
        self.runsubcmd("dns", "delete", os.environ["SERVER"], self.zone,
                       "testrecord", "A", self.testip, self.creds_string)

        result, out, err = self.runsubcmd("dns", "query",
                                          os.environ["SERVER"],
                                          self.zone, "testrecord",
                                          "A", self.creds_string)
        self.assertCmdFail(result)

    def test_add_duplicate_record(self):
        for record_type in self.good_records:
            result, out, err = self.runsubcmd("dns", "add",
                                              os.environ["SERVER"],
                                              self.zone, "testrecord",
                                              record_type,
                                              self.good_records[record_type][0],
                                              self.creds_string)
            self.assertCmdSuccess(result, out, err)
            result, out, err = self.runsubcmd("dns", "add",
                                              os.environ["SERVER"],
                                              self.zone, "testrecord",
                                              record_type,
                                              self.good_records[record_type][0],
                                              self.creds_string)
            self.assertCmdFail(result)
            result, out, err = self.runsubcmd("dns", "query",
                                              os.environ["SERVER"],
                                              self.zone, "testrecord",
                                              record_type, self.creds_string)
            self.assertCmdSuccess(result, out, err)
            result, out, err = self.runsubcmd("dns", "delete",
                                              os.environ["SERVER"],
                                              self.zone, "testrecord",
                                              record_type,
                                              self.good_records[record_type][0],
                                              self.creds_string)
            self.assertCmdSuccess(result, out, err)

    def test_remove_deleted_record(self):
        self.runsubcmd("dns", "add", os.environ["SERVER"], self.zone,
                       "testrecord", "A", self.testip, self.creds_string)
        self.runsubcmd("dns", "delete", os.environ["SERVER"], self.zone,
                       "testrecord", "A", self.testip, self.creds_string)

        # Attempting to delete a record that has already been deleted or has never existed should fail
        result, out, err = self.runsubcmd("dns", "delete",
                                          os.environ["SERVER"],
                                          self.zone, "testrecord",
                                          "A", self.testip, self.creds_string)
        self.assertCmdFail(result)
        result, out, err = self.runsubcmd("dns", "query",
                                          os.environ["SERVER"],
                                          self.zone, "testrecord",
                                          "A", self.creds_string)
        self.assertCmdFail(result)
        result, out, err = self.runsubcmd("dns", "delete",
                                          os.environ["SERVER"],
                                          self.zone, "testrecord2",
                                          "A", self.testip, self.creds_string)
        self.assertCmdFail(result)

    def test_cleanup_record(self):
        """
        Test dns cleanup command is working fine.
        """

        # add a A record
        self.runsubcmd("dns", "add", os.environ["SERVER"], self.zone,
                       'testa', "A", self.testip, self.creds_string)

        # the above A record points to this host
        dnshostname = '{0}.{1}'.format('testa', self.zone.lower())

        # add a CNAME record points to above host
        self.runsubcmd("dns", "add", os.environ["SERVER"], self.zone,
                       'testcname', "CNAME", dnshostname, self.creds_string)

        # add a NS record
        self.runsubcmd("dns", "add", os.environ["SERVER"], self.zone,
                       'testns', "NS", dnshostname, self.creds_string)

        # add a PTR record points to above host
        self.runsubcmd("dns", "add", os.environ["SERVER"], self.zone,
                       'testptr', "PTR", dnshostname, self.creds_string)

        # add a SRV record points to above host
        srv_record = "{0} 65530 65530 65530".format(dnshostname)
        self.runsubcmd("dns", "add", os.environ["SERVER"], self.zone,
                       'testsrv', "SRV", srv_record, self.creds_string)

        # cleanup record for this dns host
        self.runsubcmd("dns", "cleanup", os.environ["SERVER"],
                       dnshostname, self.creds_string)

        # all records should be marked as dNSTombstoned
        for record_name in ['testa', 'testcname', 'testns', 'testptr', 'testsrv']:

            records = self.samdb.search(
                base="DC=DomainDnsZones,{0}".format(self.samdb.get_default_basedn()),
                scope=ldb.SCOPE_SUBTREE,
                expression="(&(objectClass=dnsNode)(name={0}))".format(record_name),
                attrs=["dNSTombstoned"])

            self.assertEqual(len(records), 1)
            for record in records:
                self.assertEqual(str(record['dNSTombstoned']), 'TRUE')

    def test_cleanup_record_no_A_record(self):
        """
        Test dns cleanup command works with no A record.
        """

        # add a A record
        self.runsubcmd("dns", "add", os.environ["SERVER"], self.zone,
                       'notesta', "A", self.testip, self.creds_string)

        # the above A record points to this host
        dnshostname = '{0}.{1}'.format('testa', self.zone.lower())

        # add a CNAME record points to above host
        self.runsubcmd("dns", "add", os.environ["SERVER"], self.zone,
                       'notestcname', "CNAME", dnshostname, self.creds_string)

        # add a NS record
        self.runsubcmd("dns", "add", os.environ["SERVER"], self.zone,
                       'notestns', "NS", dnshostname, self.creds_string)

        # add a PTR record points to above host
        self.runsubcmd("dns", "add", os.environ["SERVER"], self.zone,
                       'notestptr', "PTR", dnshostname, self.creds_string)

        # add a SRV record points to above host
        srv_record = "{0} 65530 65530 65530".format(dnshostname)
        self.runsubcmd("dns", "add", os.environ["SERVER"], self.zone,
                       'notestsrv', "SRV", srv_record, self.creds_string)

        # Remove the initial A record (leading to hanging references)
        self.runsubcmd("dns", "delete", os.environ["SERVER"], self.zone,
                       'notesta', "A", self.testip, self.creds_string)

        # cleanup record for this dns host
        self.runsubcmd("dns", "cleanup", os.environ["SERVER"],
                       dnshostname, self.creds_string)

        # all records should be marked as dNSTombstoned
        for record_name in ['notestcname', 'notestns', 'notestptr', 'notestsrv']:

            records = self.samdb.search(
                base="DC=DomainDnsZones,{0}".format(self.samdb.get_default_basedn()),
                scope=ldb.SCOPE_SUBTREE,
                expression="(&(objectClass=dnsNode)(name={0}))".format(record_name),
                attrs=["dNSTombstoned"])

            self.assertEqual(len(records), 1)
            for record in records:
                self.assertEqual(str(record['dNSTombstoned']), 'TRUE')

    def test_cleanup_multi_srv_record(self):
        """
        Test dns cleanup command for multi-valued SRV record.

        Steps:
        - Add 2 A records host1 and host2
        - Add a SRV record srv1 and points to both host1 and host2
        - Run cleanup command for host1
        - Check records for srv1, data for host1 should be gone and host2 is kept.
        """

        hosts = ['host1', 'host2']  # A record names
        srv_name = 'srv1'

        # add A records
        for host in hosts:
            self.runsubcmd("dns", "add", os.environ["SERVER"], self.zone,
                           host, "A", self.testip, self.creds_string)

            # the above A record points to this host
            dnshostname = '{0}.{1}'.format(host, self.zone.lower())

            # add a SRV record points to above host
            srv_record = "{0} 65530 65530 65530".format(dnshostname)
            self.runsubcmd("dns", "add", os.environ["SERVER"], self.zone,
                           srv_name, "SRV", srv_record, self.creds_string)

        records = self.samdb.search(
            base="DC=DomainDnsZones,{0}".format(self.samdb.get_default_basedn()),
            scope=ldb.SCOPE_SUBTREE,
            expression="(&(objectClass=dnsNode)(name={0}))".format(srv_name),
            attrs=['dnsRecord'])
        # should have 2 records here
        self.assertEqual(len(records[0]['dnsRecord']), 2)

        # cleanup record for dns host1
        dnshostname1 = 'host1.{0}'.format(self.zone.lower())
        self.runsubcmd("dns", "cleanup", os.environ["SERVER"],
                       dnshostname1, self.creds_string)

        records = self.samdb.search(
            base="DC=DomainDnsZones,{0}".format(self.samdb.get_default_basedn()),
            scope=ldb.SCOPE_SUBTREE,
            expression="(&(objectClass=dnsNode)(name={0}))".format(srv_name),
            attrs=['dnsRecord', 'dNSTombstoned'])

        # dnsRecord for host1 should be deleted
        self.assertEqual(len(records[0]['dnsRecord']), 1)

        # unpack data
        dns_record_bin = records[0]['dnsRecord'][0]
        dns_record_obj = ndr_unpack(dnsp.DnssrvRpcRecord, dns_record_bin)

        # dnsRecord for host2 is still there and is the only one
        dnshostname2 = 'host2.{0}'.format(self.zone.lower())
        self.assertEqual(dns_record_obj.data.nameTarget, dnshostname2)

        # assert that the record isn't spuriously tombstoned
        self.assertTrue('dNSTombstoned' not in records[0] or
                        str(records[0]['dNSTombstoned']) == 'FALSE')

    def test_dns_wildcards(self):
        """
        Ensure that DNS wild card entries can be added deleted and queried
        """
        num_failures = 0
        failure_msgs = []
        records = [("*.", "MISS", "A", "1.1.1.1"),
                   ("*.SAMDOM", "MISS.SAMDOM", "A", "1.1.1.2")]
        for (name, miss, dnstype, record) in records:
            try:
                result, out, err = self.runsubcmd("dns", "add",
                                                  os.environ["SERVER"],
                                                  self.zone, name,
                                                  dnstype, record,
                                                  self.creds_string)
                self.assertCmdSuccess(
                    result,
                    out,
                    err,
                    ("Failed to add record %s (%s) with type %s."
                     % (name, record, dnstype)))

                result, out, err = self.runsubcmd("dns", "query",
                                                  os.environ["SERVER"],
                                                  self.zone, name,
                                                  dnstype,
                                                  self.creds_string)
                self.assertCmdSuccess(
                    result,
                    out,
                    err,
                    ("Failed to query record %s with qualifier %s."
                     % (record, dnstype)))

                # dns tool does not perform dns wildcard search if the name
                # does not match
                result, out, err = self.runsubcmd("dns", "query",
                                                  os.environ["SERVER"],
                                                  self.zone, miss,
                                                  dnstype,
                                                  self.creds_string)
                self.assertCmdFail(
                    result,
                    ("Failed to query record %s with qualifier %s."
                     % (record, dnstype)))

                result, out, err = self.runsubcmd("dns", "delete",
                                                  os.environ["SERVER"],
                                                  self.zone, name,
                                                  dnstype, record,
                                                  self.creds_string)
                self.assertCmdSuccess(
                    result,
                    out,
                    err,
                    ("Failed to remove record %s with type %s."
                     % (record, dnstype)))
            except AssertionError as e:
                num_failures = num_failures + 1
                failure_msgs.append(e)

        if num_failures > 0:
            for msg in failure_msgs:
                print(msg)
            self.fail("Failed to accept valid commands. %d total failures."
                      "Errors above." % num_failures)

    def test_serverinfo(self):
        for v in ['w2k', 'dotnet', 'longhorn']:
            result, out, err = self.runsubcmd("dns",
                                              "serverinfo",
                                              "--client-version", v,
                                              os.environ["SERVER"],
                                              self.creds_string)
            self.assertCmdSuccess(result,
                                  out,
                                  err,
                                  "Failed to print serverinfo with "
                                  "client version %s" % v)
            self.assertTrue(out != '')

    def test_zoneinfo(self):
        result, out, err = self.runsubcmd("dns",
                                          "zoneinfo",
                                          os.environ["SERVER"],
                                          self.zone,
                                          self.creds_string)
        self.assertCmdSuccess(result,
                              out,
                              err,
                              "Failed to print zoneinfo")
        self.assertTrue(out != '')

    def test_zoneoptions_aging(self):
        for options, vals, error in (
                (['--aging=1'], {'fAging': 'TRUE'}, False),
                (['--aging=0'], {'fAging': 'FALSE'}, False),
                (['--aging=-1'], {'fAging': 'FALSE'}, True),
                (['--aging=2'], {}, True),
                (['--aging=2', '--norefreshinterval=1'], {}, True),
                (['--aging=1', '--norefreshinterval=1'],
                 {'fAging': 'TRUE', 'dwNoRefreshInterval': '1'}, False),
                (['--aging=1', '--norefreshinterval=0'],
                 {'fAging': 'TRUE', 'dwNoRefreshInterval': '0'}, False),
                (['--aging=0', '--norefreshinterval=99', '--refreshinterval=99'],
                 {'fAging': 'FALSE',
                  'dwNoRefreshInterval': '99',
                  'dwRefreshInterval': '99'}, False),
                (['--aging=0', '--norefreshinterval=-99', '--refreshinterval=99'],
                 {}, True),
                (['--refreshinterval=9999999'], {}, True),
                (['--norefreshinterval=9999999'], {}, True),
                ):
            result, out, err = self.runsubcmd("dns",
                                              "zoneoptions",
                                              os.environ["SERVER"],
                                              self.zone,
                                              self.creds_string,
                                              *options)
            if error:
                self.assertCmdFail(result, "zoneoptions should fail")
            else:
                self.assertCmdSuccess(result,
                                      out,
                                      err,
                                      "zoneoptions shouldn't fail")


            info_r, info_out, info_err = self.runsubcmd("dns",
                                                        "zoneinfo",
                                                        os.environ["SERVER"],
                                                        self.zone,
                                                        self.creds_string)

            self.assertCmdSuccess(info_r,
                                  info_out,
                                  info_err,
                                  "zoneinfo shouldn't fail after zoneoptions")

            info = {k: v for k, v in re.findall(r'^\s*(\w+)\s*:\s*(\w+)\s*$',
                                                info_out,
                                                re.MULTILINE)}
            for k, v in vals.items():
                self.assertIn(k, info)
                self.assertEqual(v, info[k])


    def ldap_add_node_with_records(self, name, records):
        dn = (f"DC={name},DC={self.zone},CN=MicrosoftDNS,DC=DomainDNSZones,"
              f"{self.samdb.get_default_basedn()}")

        dns_records = []
        for r in records:
            rec = dnsp.DnssrvRpcRecord()
            rec.wType = r.get('wType', dnsp.DNS_TYPE_A)
            rec.rank = dnsp.DNS_RANK_ZONE
            rec.dwTtlSeconds = 900
            rec.dwTimeStamp = r.get('dwTimeStamp', 0)
            rec.data = r.get('data', '10.10.10.10')
            dns_records.append(ndr_pack(rec))

        msg = ldb.Message.from_dict(self.samdb,
                                    {'dn': dn,
                                     "objectClass": ["top", "dnsNode"],
                                     'dnsRecord': dns_records
                                    })
        self.samdb.add(msg)

    def get_timestamp_map(self):
        re_wtypes = (dnsp.DNS_TYPE_A,
                     dnsp.DNS_TYPE_AAAA,
                     dnsp.DNS_TYPE_TXT)

        t = time.time()
        now = dsdb_dns.unix_to_dns_timestamp(int(t))

        records = self.get_all_records(self.zone)
        tsmap = {}
        for k, recs in records.items():
            m = []
            tsmap[k] = m
            for rec in recs:
                r = ndr_unpack(dnsp.DnssrvRpcRecord, rec)
                timestamp = r.dwTimeStamp
                if abs(timestamp - now) < 3:
                    timestamp = 'nowish'

                if r.wType in re_wtypes:
                    m.append(('R', timestamp))
                else:
                    m.append(('-', timestamp))

        return tsmap


    def test_zoneoptions_mark_records(self):
        self.maxDiff = 10000
        # We need a number of records to work with, so we'll use part
        # of our known good records list, using three different names
        # to test the regex. All these records will be static.
        for dnstype in self.good_records:
            for record in self.good_records[dnstype][:2]:
                self.runsubcmd("dns", "add",
                               os.environ["SERVER"],
                               self.zone, "frobitz",
                               dnstype, record,
                               self.creds_string)
                self.runsubcmd("dns", "add",
                               os.environ["SERVER"],
                               self.zone, "weergly",
                               dnstype, record,
                               self.creds_string)
                self.runsubcmd("dns", "add",
                               os.environ["SERVER"],
                               self.zone, "snizle",
                               dnstype, record,
                               self.creds_string)

        # and we also want some that aren't static, and some mixed
        # static/dynamic records.
        # timestamps are in hours since 1601; now ~= 3.7 million
        for ts in (0, 100, 10 ** 6, 10 ** 7):
            name = f"ts-{ts}"
            self.ldap_add_node_with_records(name, [{"dwTimeStamp": ts}])

        recs = []
        for ts in (0, 100, 10 ** 6, 10 ** 7):
            addr = f'10.{(ts >> 16) & 255}.{(ts >> 8) & 255}.{ts & 255}'
            recs.append({"dwTimeStamp": ts, "data": addr})

        self.ldap_add_node_with_records("ts-multi", recs)

        # get the state of ALL records.
        # then we make assertions about the diffs, keeping track of
        # the current state.

        tsmap = self.get_timestamp_map()



        for options, diff, output_substrings, error in (
                # --mark-old-records-static
                # --mark-records-static-regex
                # --mark-records-dynamic-regex
                (
                    ['--mark-old-records-static=1971-13-04'],
                    {},
                    [],
                    "bad date"
                ),
                (
                    # using --dry-run, should be no change, but output.
                    ['--mark-old-records-static=1971-03-04', '--dry-run'],
                    {},
                    [
                        "would make 1/1 records static on ts-1000000.zone.",
                        "would make 1/1 records static on ts-100.zone.",
                        "would make 2/4 records static on ts-multi.zone.",
                    ],
                    False
                ),
                (
                    # timestamps < ~ 3.25 million are now static
                    ['--mark-old-records-static=1971-03-04'],
                    {
                        'ts-100': [('R', 0)],
                        'ts-1000000': [('R', 0)],
                        'ts-multi': [('R', 0), ('R', 0), ('R', 0), ('R', 10000000)]
                    },
                    [
                        "made 1/1 records static on ts-1000000.zone.",
                        "made 1/1 records static on ts-100.zone.",
                        "made 2/4 records static on ts-multi.zone.",
                    ],
                    False
                ),
                (
                    # no change, old records already static
                    ['--mark-old-records-static=1972-03-04'],
                    {},
                    [],
                    False
                ),
                (
                    # no change, samba-tool added records already static
                    ['--mark-records-static-regex=sniz'],
                    {},
                    [],
                    False
                ),
                (
                    # snizle has 2 A, 2 AAAA, 10 fancy, and 2 TXT records, in
                    # that order.
                    # the A, AAAA, and TXT records should be dynamic
                    ['--mark-records-dynamic-regex=sniz'],
                    {'snizle': [('R', 'nowish'),
                                ('R', 'nowish'),
                                ('R', 'nowish'),
                                ('R', 'nowish'),
                                ('-', 0),
                                ('-', 0),
                                ('-', 0),
                                ('-', 0),
                                ('-', 0),
                                ('-', 0),
                                ('-', 0),
                                ('-', 0),
                                ('-', 0),
                                ('-', 0),
                                ('R', 'nowish'),
                                ('R', 'nowish')]
                    },
                    ['made 6/16 records dynamic on snizle.zone.'],
                    False
                ),
                (
                    # This regex should catch snizle, weergly, and ts-*
                    # but we're doing dry-run so no change
                    ['--mark-records-dynamic-regex=[sw]', '-n'],
                    {},
                    ['would make 3/4 records dynamic on ts-multi.zone.',
                     'would make 1/1 records dynamic on ts-0.zone.',
                     'would make 1/1 records dynamic on ts-1000000.zone.',
                     'would make 6/16 records dynamic on weergly.zone.',
                     'would make 1/1 records dynamic on ts-100.zone.'
                    ],
                    False
                ),
                (
                    # This regex should catch snizle and frobitz
                    # but snizle has already been changed.
                    ['--mark-records-dynamic-regex=z'],
                    {'frobitz': [('R', 'nowish'),
                                 ('R', 'nowish'),
                                 ('R', 'nowish'),
                                 ('R', 'nowish'),
                                 ('-', 0),
                                 ('-', 0),
                                 ('-', 0),
                                 ('-', 0),
                                 ('-', 0),
                                 ('-', 0),
                                 ('-', 0),
                                 ('-', 0),
                                 ('-', 0),
                                 ('-', 0),
                                 ('R', 'nowish'),
                                 ('R', 'nowish')]
                    },
                    ['made 6/16 records dynamic on frobitz.zone.'],
                    False
                ),
                (
                    # This regex should catch snizle, frobitz, and
                    # ts-multi. Note that the 1e7 ts-multi record is
                    # already dynamic and doesn't change.
                    ['--mark-records-dynamic-regex=[i]'],
                    {'ts-multi': [('R', 'nowish'),
                                  ('R', 'nowish'),
                                  ('R', 'nowish'),
                                  ('R', 10000000)]
                    },
                    ['made 3/4 records dynamic on ts-multi.zone.'],
                    False
                ),
                (
                    # matches no records
                    ['--mark-records-dynamic-regex=^aloooooo[qw]+'],
                    {},
                    [],
                    False
                ),
                (
                    # This should be an error, as only one --mark-*
                    # argument is allowed at a time
                    ['--mark-records-dynamic-regex=.',
                     '--mark-records-static-regex=.',
                    ],
                    {},
                    [],
                    True
                ),
                (
                    # This should also be an error
                    ['--mark-old-records-static=1997-07-07',
                     '--mark-records-static-regex=.',
                    ],
                    {},
                    [],
                    True
                ),
                (
                    # This should not be an error. --aging and refresh
                    # options can be mixed with --mark ones.
                    ['--mark-old-records-static=1997-07-07',
                     '--aging=0',
                    ],
                    {},
                    ['Set Aging to 0'],
                    False
                ),
                (
                    # This regex should catch weergly, but all the
                    # records are already static,
                    ['--mark-records-static-regex=wee'],
                    {},
                    [],
                    False
                ),
                (
                    # Make frobitz static again.
                    ['--mark-records-static-regex=obi'],
                    {'frobitz': [('R', 0),
                                 ('R', 0),
                                 ('R', 0),
                                 ('R', 0),
                                 ('-', 0),
                                 ('-', 0),
                                 ('-', 0),
                                 ('-', 0),
                                 ('-', 0),
                                 ('-', 0),
                                 ('-', 0),
                                 ('-', 0),
                                 ('-', 0),
                                 ('-', 0),
                                 ('R', 0),
                                 ('R', 0)]
                    },
                    ['made 6/16 records static on frobitz.zone.'],
                    False
                ),
                (
                    # would make almost everything static, but --dry-run
                    ['--mark-old-records-static=2222-03-04', '--dry-run'],
                    {},
                    [
                        'would make 6/16 records static on snizle.zone.',
                        'would make 3/4 records static on ts-multi.zone.'
                    ],
                    False
                ),
                (
                    # make everything static
                    ['--mark-records-static-regex=.'],
                     {'snizle': [('R', 0),
                                 ('R', 0),
                                 ('R', 0),
                                 ('R', 0),
                                 ('-', 0),
                                 ('-', 0),
                                 ('-', 0),
                                 ('-', 0),
                                 ('-', 0),
                                 ('-', 0),
                                 ('-', 0),
                                 ('-', 0),
                                 ('-', 0),
                                 ('-', 0),
                                 ('R', 0),
                                 ('R', 0)],
                      'ts-10000000': [('R', 0)],
                      'ts-multi': [('R', 0), ('R', 0), ('R', 0), ('R', 0)]
                     },
                    [
                        'made 4/4 records static on ts-multi.zone.',
                        'made 1/1 records static on ts-10000000.zone.',
                        'made 6/16 records static on snizle.zone.',
                    ],
                    False
                ),
                (
                    # make everything dynamic that can be
                    ['--mark-records-dynamic-regex=.'],
                    {'frobitz': [('R', 'nowish'),
                                 ('R', 'nowish'),
                                 ('R', 'nowish'),
                                 ('R', 'nowish'),
                                 ('-', 0),
                                 ('-', 0),
                                 ('-', 0),
                                 ('-', 0),
                                 ('-', 0),
                                 ('-', 0),
                                 ('-', 0),
                                 ('-', 0),
                                 ('-', 0),
                                 ('-', 0),
                                 ('R', 'nowish'),
                                 ('R', 'nowish')],
                     'snizle': [('R', 'nowish'),
                                ('R', 'nowish'),
                                ('R', 'nowish'),
                                ('R', 'nowish'),
                                ('-', 0),
                                ('-', 0),
                                ('-', 0),
                                ('-', 0),
                                ('-', 0),
                                ('-', 0),
                                ('-', 0),
                                ('-', 0),
                                ('-', 0),
                                ('-', 0),
                                ('R', 'nowish'),
                                ('R', 'nowish')],
                     'ts-0': [('R', 'nowish')],
                     'ts-100': [('R', 'nowish')],
                     'ts-1000000': [('R', 'nowish')],
                     'ts-10000000': [('R', 'nowish')],
                     'ts-multi': [('R', 'nowish'),
                                  ('R', 'nowish'),
                                  ('R', 'nowish'),
                                  ('R', 'nowish')],
                     'weergly': [('R', 'nowish'),
                                 ('R', 'nowish'),
                                 ('R', 'nowish'),
                                 ('R', 'nowish'),
                                 ('-', 0),
                                 ('-', 0),
                                 ('-', 0),
                                 ('-', 0),
                                 ('-', 0),
                                 ('-', 0),
                                 ('-', 0),
                                 ('-', 0),
                                 ('-', 0),
                                 ('-', 0),
                                 ('R', 'nowish'),
                                 ('R', 'nowish')]
                    },
                    [
                        'made 4/4 records dynamic on ts-multi.zone.',
                        'made 6/16 records dynamic on snizle.zone.',
                        'made 1/1 records dynamic on ts-0.zone.',
                        'made 1/1 records dynamic on ts-1000000.zone.',
                        'made 1/1 records dynamic on ts-10000000.zone.',
                        'made 1/1 records dynamic on ts-100.zone.',
                        'made 6/16 records dynamic on frobitz.zone.',
                        'made 6/16 records dynamic on weergly.zone.',
                    ],
                    False
                ),
            ):
            result, out, err = self.runsubcmd("dns",
                                              "zoneoptions",
                                              os.environ["SERVER"],
                                              self.zone,
                                              self.creds_string,
                                              *options)
            if error:
                self.assertCmdFail(result, f"zoneoptions should fail ({error})")
            else:
                self.assertCmdSuccess(result,
                                      out,
                                      err,
                                      "zoneoptions shouldn't fail")

            new_tsmap = self.get_timestamp_map()

            # same keys, always
            self.assertEqual(sorted(new_tsmap), sorted(tsmap))
            changes = {}
            for k in tsmap:
                if tsmap[k] != new_tsmap[k]:
                    changes[k] = new_tsmap[k]

            self.assertEqual(diff, changes)

            for s in output_substrings:
                self.assertIn(s, out)
            tsmap = new_tsmap

    def test_zonecreate_dns_domain_directory_partition(self):
        zone = "test-dns-domain-dp-zone"
        dns_dp_opt = "--dns-directory-partition=domain"

        result, out, err = self.runsubcmd("dns",
                                          "zonecreate",
                                          os.environ["SERVER"],
                                          zone,
                                          self.creds_string,
                                          dns_dp_opt)
        self.assertCmdSuccess(result,
                              out,
                              err,
                              "Failed to create zone with "
                              "--dns-directory-partition option")
        self.assertTrue('Zone %s created successfully' % zone in out,
                        "Unexpected output: %s")

        result, out, err = self.runsubcmd("dns",
                                          "zoneinfo",
                                          os.environ["SERVER"],
                                          zone,
                                          self.creds_string)
        self.assertCmdSuccess(result, out, err)
        self.assertTrue("DNS_DP_DOMAIN_DEFAULT" in out,
                        "Missing DNS_DP_DOMAIN_DEFAULT flag")

        result, out, err = self.runsubcmd("dns",
                                          "zonedelete",
                                          os.environ["SERVER"],
                                          zone,
                                          self.creds_string)
        self.assertCmdSuccess(result, out, err,
                              "Failed to delete zone in domain DNS directory "
                              "partition")
        result, out, err = self.runsubcmd("dns",
                                          "zonelist",
                                          os.environ["SERVER"],
                                          self.creds_string)
        self.assertCmdSuccess(result, out, err,
                              "Failed to delete zone in domain DNS directory "
                              "partition")
        self.assertTrue(zone not in out,
                        "Deleted zone still exists")

    def test_zonecreate_dns_forest_directory_partition(self):
        zone = "test-dns-forest-dp-zone"
        dns_dp_opt = "--dns-directory-partition=forest"

        result, out, err = self.runsubcmd("dns",
                                          "zonecreate",
                                          os.environ["SERVER"],
                                          zone,
                                          self.creds_string,
                                          dns_dp_opt)
        self.assertCmdSuccess(result,
                              out,
                              err,
                              "Failed to create zone with "
                              "--dns-directory-partition option")
        self.assertTrue('Zone %s created successfully' % zone in out,
                        "Unexpected output: %s")

        result, out, err = self.runsubcmd("dns",
                                          "zoneinfo",
                                          os.environ["SERVER"],
                                          zone,
                                          self.creds_string)
        self.assertCmdSuccess(result, out, err)
        self.assertTrue("DNS_DP_FOREST_DEFAULT" in out,
                        "Missing DNS_DP_FOREST_DEFAULT flag")

        result, out, err = self.runsubcmd("dns",
                                          "zonedelete",
                                          os.environ["SERVER"],
                                          zone,
                                          self.creds_string)
        self.assertCmdSuccess(result, out, err,
                              "Failed to delete zone in forest DNS directory "
                              "partition")

        result, out, err = self.runsubcmd("dns",
                                          "zonelist",
                                          os.environ["SERVER"],
                                          self.creds_string)
        self.assertCmdSuccess(result, out, err,
                              "Failed to delete zone in forest DNS directory "
                              "partition")
        self.assertTrue(zone not in out,
                        "Deleted zone still exists")
