from __future__ import print_function, division, absolute_import

import mock

import pytest
from toolz import first
import tornado

from distributed import Client
from distributed.utils_test import cluster, mock_ipython
from distributed.utils_test import loop, zmq_ctx  # noqa F401


def need_functional_ipython(func):
    try:
        import ipykernel  # noqa: F401
        import jupyter_client  # noqa: F401
    except ImportError:
        return pytest.mark.skip("need ipykernel and jupyter_client installed")(func)
    if tornado.version_info >= (5,):
        # https://github.com/ipython/ipykernel/issues/277
        return pytest.mark.skip("IPython kernel broken with Tornado 5")(func)
    else:
        return func


@pytest.mark.ipython
@need_functional_ipython
def test_start_ipython_workers(loop, zmq_ctx):
    from jupyter_client import BlockingKernelClient

    with cluster(1) as (s, [a]):
        with Client(s['address'], loop=loop) as e:
            info_dict = e.start_ipython_workers()
            info = first(info_dict.values())
            key = info.pop('key')
            kc = BlockingKernelClient(**info)
            kc.session.key = key
            kc.start_channels()
            kc.wait_for_ready(timeout=10)
            msg_id = kc.execute("worker")
            reply = kc.get_shell_msg(timeout=10)
            assert reply['parent_header']['msg_id'] == msg_id
            assert reply['content']['status'] == 'ok'
            kc.stop_channels()


@pytest.mark.ipython
@need_functional_ipython
def test_start_ipython_scheduler(loop, zmq_ctx):
    from jupyter_client import BlockingKernelClient

    with cluster(1) as (s, [a]):
        with Client(s['address'], loop=loop) as e:
            info = e.start_ipython_scheduler()
            key = info.pop('key')
            kc = BlockingKernelClient(**info)
            kc.session.key = key
            kc.start_channels()
            msg_id = kc.execute("scheduler")
            reply = kc.get_shell_msg(timeout=10)
            kc.stop_channels()


@pytest.mark.ipython
@need_functional_ipython
def test_start_ipython_scheduler_magic(loop, zmq_ctx):
    with cluster(1) as (s, [a]):
        with Client(s['address'], loop=loop) as e, mock_ipython() as ip:
            info = e.start_ipython_scheduler()

        expected = [
            {'magic_kind': 'line', 'magic_name': 'scheduler'},
            {'magic_kind': 'cell', 'magic_name': 'scheduler'},
        ]

        call_kwargs_list = [kwargs for (args, kwargs) in ip.register_magic_function.call_args_list]
        assert call_kwargs_list == expected
        magic = ip.register_magic_function.call_args_list[0][0][0]
        magic(line="", cell="scheduler")


@pytest.mark.ipython
@need_functional_ipython
def test_start_ipython_workers_magic(loop, zmq_ctx):
    with cluster(2) as (s, [a, b]):

        with Client(s['address'], loop=loop) as e, mock_ipython() as ip:
            workers = list(e.ncores())[:2]
            names = ['magic%i' % i for i in range(len(workers))]
            info_dict = e.start_ipython_workers(workers, magic_names=names)

        expected = [
            {'magic_kind': 'line', 'magic_name': 'remote'},
            {'magic_kind': 'cell', 'magic_name': 'remote'},
            {'magic_kind': 'line', 'magic_name': 'magic0'},
            {'magic_kind': 'cell', 'magic_name': 'magic0'},
            {'magic_kind': 'line', 'magic_name': 'magic1'},
            {'magic_kind': 'cell', 'magic_name': 'magic1'},
        ]
        call_kwargs_list = [kwargs for (args, kwargs) in ip.register_magic_function.call_args_list]
        assert call_kwargs_list == expected
        assert ip.register_magic_function.call_count == 6
        magics = [args[0][0] for args in ip.register_magic_function.call_args_list[2:]]
        magics[-1](line="", cell="worker")
        [m.client.stop_channels() for m in magics]


@pytest.mark.ipython
@need_functional_ipython
def test_start_ipython_workers_magic_asterix(loop, zmq_ctx):
    with cluster(2) as (s, [a, b]):

        with Client(s['address'], loop=loop) as e, mock_ipython() as ip:
            workers = list(e.ncores())[:2]
            info_dict = e.start_ipython_workers(workers, magic_names='magic_*')

        expected = [
            {'magic_kind': 'line', 'magic_name': 'remote'},
            {'magic_kind': 'cell', 'magic_name': 'remote'},
            {'magic_kind': 'line', 'magic_name': 'magic_0'},
            {'magic_kind': 'cell', 'magic_name': 'magic_0'},
            {'magic_kind': 'line', 'magic_name': 'magic_1'},
            {'magic_kind': 'cell', 'magic_name': 'magic_1'},
        ]
        call_kwargs_list = [kwargs for (args, kwargs) in ip.register_magic_function.call_args_list]
        assert call_kwargs_list == expected
        assert ip.register_magic_function.call_count == 6
        magics = [args[0][0] for args in ip.register_magic_function.call_args_list[2:]]
        magics[-1](line="", cell="worker")
        [m.client.stop_channels() for m in magics]


@pytest.mark.ipython
@need_functional_ipython
def test_start_ipython_remote(loop, zmq_ctx):
    from distributed._ipython_utils import remote_magic
    with cluster(1) as (s, [a]):
        with Client(s['address'], loop=loop) as e, mock_ipython() as ip:
            worker = first(e.ncores())
            ip.user_ns['info'] = e.start_ipython_workers(worker)[worker]
            remote_magic('info 1')  # line magic
            remote_magic('info', 'worker')  # cell magic

        expected = [
            ((remote_magic,), {'magic_kind': 'line', 'magic_name': 'remote'}),
            ((remote_magic,), {'magic_kind': 'cell', 'magic_name': 'remote'}),
        ]
        assert ip.register_magic_function.call_args_list == expected
        assert ip.register_magic_function.call_count == 2


@pytest.mark.ipython
@need_functional_ipython
def test_start_ipython_qtconsole(loop):
    Popen = mock.Mock()
    with cluster() as (s, [a, b]):
        with mock.patch('distributed._ipython_utils.Popen', Popen), Client(s['address'], loop=loop) as e:
            worker = first(e.ncores())
            e.start_ipython_workers(worker, qtconsole=True)
            e.start_ipython_workers(worker, qtconsole=True, qtconsole_args=['--debug'])
    assert Popen.call_count == 2
    (cmd,), kwargs = Popen.call_args_list[0]
    assert cmd[:3] == ['jupyter', 'qtconsole', '--existing']
    (cmd,), kwargs = Popen.call_args_list[1]
    assert cmd[-1:] == ['--debug']
