from io import BytesIO
import os
import tempfile

import pytest

from .chunker import cf
from ..chunker import Chunker, ChunkerFixed, sparsemap, has_seek_hole
from ..constants import *  # NOQA

BS = 4096  # fs block size

# some sparse files. X = content blocks, _ = sparse blocks.
# X__XXX____
map_sparse1 = [
    (0 * BS, 1 * BS, True),
    (1 * BS, 2 * BS, False),
    (3 * BS, 3 * BS, True),
    (6 * BS, 4 * BS, False),
]

# _XX___XXXX
map_sparse2 = [
    (0 * BS, 1 * BS, False),
    (1 * BS, 2 * BS, True),
    (3 * BS, 3 * BS, False),
    (6 * BS, 4 * BS, True),
]

# XXX
map_notsparse = [(0 * BS, 3 * BS, True), ]

# ___
map_onlysparse = [(0 * BS, 3 * BS, False), ]


def make_sparsefile(fname, sparsemap, header_size=0):
    with open(fname, 'wb') as fd:
        total = 0
        if header_size:
            fd.write(b'H' * header_size)
            total += header_size
        for offset, size, is_data in sparsemap:
            if is_data:
                fd.write(b'X' * size)
            else:
                fd.seek(size, os.SEEK_CUR)
            total += size
        fd.truncate(total)
    assert os.path.getsize(fname) == total


def make_content(sparsemap, header_size=0):
    result = []
    total = 0
    if header_size:
        result.append(b'H' * header_size)
        total += header_size
    for offset, size, is_data in sparsemap:
        if is_data:
            result.append(b'X' * size)  # bytes!
        else:
            result.append(size)  # int!
        total += size
    return result


def fs_supports_sparse():
    if not has_seek_hole:
        return False
    with tempfile.TemporaryDirectory() as tmpdir:
        fn = os.path.join(tmpdir, 'test_sparse')
        make_sparsefile(fn, [(0, BS, False), (BS, BS, True)])
        with open(fn, 'rb') as f:
            try:
                offset_hole = f.seek(0, os.SEEK_HOLE)
                offset_data = f.seek(0, os.SEEK_DATA)
            except OSError:
                # no sparse support if these seeks do not work
                return False
        return offset_hole == 0 and offset_data == BS


@pytest.mark.skipif(not fs_supports_sparse(), reason='fs does not support sparse files')
@pytest.mark.parametrize("fname, sparse_map", [
    ('sparse1', map_sparse1),
    ('sparse2', map_sparse2),
    ('onlysparse', map_onlysparse),
    ('notsparse', map_notsparse),
])
def test_sparsemap(tmpdir, fname, sparse_map):

    def get_sparsemap_fh(fname):
        fh = os.open(fname, flags=os.O_RDONLY)
        try:
            return list(sparsemap(fh=fh))
        finally:
            os.close(fh)

    def get_sparsemap_fd(fname):
        with open(fname, 'rb') as fd:
            return list(sparsemap(fd=fd))

    fn = str(tmpdir / fname)
    make_sparsefile(fn, sparse_map)
    assert get_sparsemap_fh(fn) == sparse_map
    assert get_sparsemap_fd(fn) == sparse_map


@pytest.mark.skipif(not fs_supports_sparse(), reason='fs does not support sparse files')
@pytest.mark.parametrize("fname, sparse_map, header_size, sparse", [
    ('sparse1', map_sparse1, 0, False),
    ('sparse1', map_sparse1, 0, True),
    ('sparse1', map_sparse1, BS, False),
    ('sparse1', map_sparse1, BS, True),
    ('sparse2', map_sparse2, 0, False),
    ('sparse2', map_sparse2, 0, True),
    ('sparse2', map_sparse2, BS, False),
    ('sparse2', map_sparse2, BS, True),
    ('onlysparse', map_onlysparse, 0, False),
    ('onlysparse', map_onlysparse, 0, True),
    ('onlysparse', map_onlysparse, BS, False),
    ('onlysparse', map_onlysparse, BS, True),
    ('notsparse', map_notsparse, 0, False),
    ('notsparse', map_notsparse, 0, True),
    ('notsparse', map_notsparse, BS, False),
    ('notsparse', map_notsparse, BS, True),
])
def test_chunkify_sparse(tmpdir, fname, sparse_map, header_size, sparse):

    def get_chunks(fname, sparse, header_size):
        chunker = ChunkerFixed(4096, header_size=header_size, sparse=sparse)
        with open(fname, 'rb') as fd:
            return cf(chunker.chunkify(fd))

    fn = str(tmpdir / fname)
    make_sparsefile(fn, sparse_map, header_size=header_size)
    get_chunks(fn, sparse=sparse, header_size=header_size) == make_content(sparse_map, header_size=header_size)


def test_buzhash_chunksize_distribution():
    data = os.urandom(1048576)
    min_exp, max_exp, mask = 10, 16, 14  # chunk size target 16kiB, clip at 1kiB and 64kiB
    chunker = Chunker(0, min_exp, max_exp, mask, 4095)
    f = BytesIO(data)
    chunks = cf(chunker.chunkify(f))
    del chunks[-1]  # get rid of the last chunk, it can be smaller than 2**min_exp
    chunk_sizes = [len(chunk) for chunk in chunks]
    chunks_count = len(chunks)
    min_chunksize_observed = min(chunk_sizes)
    max_chunksize_observed = max(chunk_sizes)
    min_count = sum((int(size == 2 ** min_exp) for size in chunk_sizes))
    max_count = sum((int(size == 2 ** max_exp) for size in chunk_sizes))
    print(f"count: {chunks_count} min: {min_chunksize_observed} max: {max_chunksize_observed} "
          f"min count: {min_count} max count: {max_count}")
    # usually there will about 64 chunks
    assert 32 < chunks_count < 128
    # chunks always must be between min and max (clipping must work):
    assert min_chunksize_observed >= 2 ** min_exp
    assert max_chunksize_observed <= 2 ** max_exp
    # most chunks should be cut due to buzhash triggering, not due to clipping at min/max size:
    assert min_count < 10
    assert max_count < 10
