# coding: utf-8

import struct

import pytest

from pandas.compat import OrderedDict, u

from pandas import compat

from pandas.io.msgpack import Packer, Unpacker, packb, unpackb


class TestPack(object):

    def check(self, data, use_list=False):
        re = unpackb(packb(data), use_list=use_list)
        assert re == data

    def testPack(self):
        test_data = [
            0, 1, 127, 128, 255, 256, 65535, 65536,
            -1, -32, -33, -128, -129, -32768, -32769,
            1.0,
            b"", b"a", b"a" * 31, b"a" * 32,
            None, True, False,
            (), ((),), ((), None,),
            {None: 0},
            (1 << 23),
        ]
        for td in test_data:
            self.check(td)

    def testPackUnicode(self):
        test_data = [u(""), u("abcd"), [u("defgh")], u("Русский текст"), ]
        for td in test_data:
            re = unpackb(
                packb(td, encoding='utf-8'), use_list=1, encoding='utf-8')
            assert re == td
            packer = Packer(encoding='utf-8')
            data = packer.pack(td)
            re = Unpacker(
                compat.BytesIO(data), encoding='utf-8', use_list=1).unpack()
            assert re == td

    def testPackUTF32(self):
        test_data = [
            compat.u(""),
            compat.u("abcd"),
            [compat.u("defgh")],
            compat.u("Русский текст"),
        ]
        for td in test_data:
            re = unpackb(
                packb(td, encoding='utf-32'), use_list=1, encoding='utf-32')
            assert re == td

    def testPackBytes(self):
        test_data = [b"", b"abcd", (b"defgh", ), ]
        for td in test_data:
            self.check(td)

    def testIgnoreUnicodeErrors(self):
        re = unpackb(
            packb(b'abc\xeddef'), encoding='utf-8', unicode_errors='ignore',
            use_list=1)
        assert re == "abcdef"

    def testStrictUnicodeUnpack(self):
        msg = (r"'utf-*8' codec can't decode byte 0xed in position 3:"
               " invalid continuation byte")
        with pytest.raises(UnicodeDecodeError, match=msg):
            unpackb(packb(b'abc\xeddef'), encoding='utf-8', use_list=1)

    def testStrictUnicodePack(self):
        msg = (r"'ascii' codec can't encode character u*'\\xed' in position 3:"
               r" ordinal not in range\(128\)")
        with pytest.raises(UnicodeEncodeError, match=msg):
            packb(compat.u("abc\xeddef"), encoding='ascii',
                  unicode_errors='strict')

    def testIgnoreErrorsPack(self):
        re = unpackb(
            packb(
                compat.u("abcФФФdef"), encoding='ascii',
                unicode_errors='ignore'), encoding='utf-8', use_list=1)
        assert re == compat.u("abcdef")

    def testNoEncoding(self):
        msg = "Can't encode unicode string: no encoding is specified"
        with pytest.raises(TypeError, match=msg):
            packb(compat.u("abc"), encoding=None)

    def testDecodeBinary(self):
        re = unpackb(packb("abc"), encoding=None, use_list=1)
        assert re == b"abc"

    def testPackFloat(self):
        assert packb(1.0,
                     use_single_float=True) == b'\xca' + struct.pack('>f', 1.0)
        assert packb(
            1.0, use_single_float=False) == b'\xcb' + struct.pack('>d', 1.0)

    def testArraySize(self, sizes=[0, 5, 50, 1000]):
        bio = compat.BytesIO()
        packer = Packer()
        for size in sizes:
            bio.write(packer.pack_array_header(size))
            for i in range(size):
                bio.write(packer.pack(i))

        bio.seek(0)
        unpacker = Unpacker(bio, use_list=1)
        for size in sizes:
            assert unpacker.unpack() == list(range(size))

    def test_manualreset(self, sizes=[0, 5, 50, 1000]):
        packer = Packer(autoreset=False)
        for size in sizes:
            packer.pack_array_header(size)
            for i in range(size):
                packer.pack(i)

        bio = compat.BytesIO(packer.bytes())
        unpacker = Unpacker(bio, use_list=1)
        for size in sizes:
            assert unpacker.unpack() == list(range(size))

        packer.reset()
        assert packer.bytes() == b''

    def testMapSize(self, sizes=[0, 5, 50, 1000]):
        bio = compat.BytesIO()
        packer = Packer()
        for size in sizes:
            bio.write(packer.pack_map_header(size))
            for i in range(size):
                bio.write(packer.pack(i))  # key
                bio.write(packer.pack(i * 2))  # value

        bio.seek(0)
        unpacker = Unpacker(bio)
        for size in sizes:
            assert unpacker.unpack() == {i: i * 2 for i in range(size)}

    def test_odict(self):
        seq = [(b'one', 1), (b'two', 2), (b'three', 3), (b'four', 4)]
        od = OrderedDict(seq)
        assert unpackb(packb(od), use_list=1) == dict(seq)

        def pair_hook(seq):
            return list(seq)

        assert unpackb(
            packb(od), object_pairs_hook=pair_hook, use_list=1) == seq

    def test_pairlist(self):
        pairlist = [(b'a', 1), (2, b'b'), (b'foo', b'bar')]
        packer = Packer()
        packed = packer.pack_map_pairs(pairlist)
        unpacked = unpackb(packed, object_pairs_hook=list)
        assert pairlist == unpacked
