ó
_y]c           @   sœ   d  d l  Z  d  d l Z d  d l Z d  d l m Z d  d l m Z e j d d k Z e rf e	 Z
 n  d e j f d „  ƒ  YZ e d k r˜ e j ƒ  n  d S(	   iÿÿÿÿN(   t   Empty(   t   QtKernelManageri    t   2t   Testsc           B   s8   e  Z d  „  Z d „  Z d d „ Z d „  Z d „  Z RS(   c         C   sâ   t  ƒ  |  _ |  j j ƒ  |  j j ƒ  |  _ |  j j d t d t ƒ |  j j ƒ  |  _ |  j j d t d t ƒ |  j j |  _ |  j j	 d ƒ y |  j
 ƒ  |  j
 ƒ  Wn5 t k
 rÝ |  j j	 d ƒ |  j
 ƒ  |  j
 ƒ  n Xd S(   s   Open a kernel.t   shellt   iopubs   print(0)N(   R   t   kernel_managert   start_kernelt   clientt   kernel_clientt   start_channelst   Truet   blocking_clientt   comm_managert   executet   _get_next_msgt   TimeoutError(   t   self(    (    s9   lib/python2.7/site-packages/qtconsole/tests/test_comms.pyt   setUp   s    

c         C   s<   |  j  r |  j  j d t ƒ n  |  j r8 |  j j ƒ  n  d S(   s   Close the kernel.t   nowN(   R   t   shutdown_kernelR   R	   t   shutdown(   R   (    (    s9   lib/python2.7/site-packages/qtconsole/tests/test_comms.pyt   tearDown&   s    		i
   c         C   sƒ   t  j  ƒ  | } d } xf | d k r~ | t  j  ƒ  k  r@ t ‚ n  y' |  j j d d ƒ } | d d } Wq t k
 rz q Xq W| S(   Nt   statust   timeouti   t   headert   msg_type(   t   timeR   R   t   get_iopub_msgR    (   R   R   t   timeout_timeR   t   msg(    (    s9   lib/python2.7/site-packages/qtconsole/tests/test_comms.pyR   -   s    	c            sÉ  |  j  ‰  |  j } d d ‡  f d †  ƒ  Y} | ƒ  } | j d ƒ |  j ƒ  } | d d d k sg t ‚ |  j ƒ  } | d d d k s t ‚ ˆ  j | ƒ | j d k s¯ t ‚ | j j | d	 d
 k sÏ t ‚ |  j ƒ  } | d d d k sõ t ‚ ˆ  j | ƒ | j d k st ‚ | j j | d	 d
 k s7t ‚ |  j ƒ  } | d d d k s]t ‚ ˆ  j | ƒ | j d k st ‚ | j j | d	 d
 k sŸt ‚ |  j ƒ  } | d d d k sÅt ‚ d S(   s,   Communicate from the kernel to the frontend.t   DummyCommHandlerc              s)   e  Z ‡  f d  †  Z d „  Z d „  Z RS(   c            s    ˆ  j  d |  j ƒ d  |  _ d  S(   Nt   test_api(   t   register_targett	   comm_opent   Nonet   last_msg(   R   (   R   (    s9   lib/python2.7/site-packages/qtconsole/tests/test_comms.pyt   __init__A   s    c         S   s>   | j  |  j ƒ | j |  j ƒ | d d |  _ | |  _ d  S(   Nt   contentt   data(   t   on_msgt   comm_messaget   on_closeR$   t   comm(   R   R+   R   (    (    s9   lib/python2.7/site-packages/qtconsole/tests/test_comms.pyR"   E   s    c         S   s   | d d |  _  d  S(   NR&   R'   (   R$   (   R   R   (    (    s9   lib/python2.7/site-packages/qtconsole/tests/test_comms.pyR)   K   s    (   t   __name__t
   __module__R%   R"   R)   (    (   R   (    s9   lib/python2.7/site-packages/qtconsole/tests/test_comms.pyR   @   s   	s‘   from ipykernel.comm import Comm
comm = Comm(target_name='test_api', data='open')
comm.send('message')
comm.close('close')
del comm
print('Done')
R   R   t   execute_inputR"   t   openR&   t   comm_idt   comm_msgt   messaget
   comm_closet   closet   streamN(    (	   R   R   R   R   t   AssertionErrort	   _dispatchR$   R+   R0   (   R   R   R   t   handlerR   (    (   R   s9   lib/python2.7/site-packages/qtconsole/tests/test_comms.pyt   test_kernel_to_frontend;   s2    			   c         C   s{  |  j  } |  j } | j d ƒ |  j ƒ  } | d d d k sE t ‚ | j d d d ƒ} |  j ƒ  } | d d d k s€ t ‚ | d	 d
 d k sš t ‚ | j d ƒ |  j ƒ  } | d d d k sÍ t ‚ | d	 d
 d k sç t ‚ | j d ƒ |  j ƒ  } | d d d k st ‚ | j | d	 d k s7t ‚ |  j ƒ  } | d d d k s]t ‚ | d	 d
 d k swt ‚ d S(   s,   Communicate from the frontend to the kernel.s‘  class DummyCommHandler():
    def __init__(self):
        get_ipython().kernel.comm_manager.register_target(
            'test_api', self.comm_open)
    def comm_open(self, comm, msg):
        comm.on_msg(self.comm_message)
        comm.on_close(self.comm_message)
        print(msg['content']['data'])
    def comm_message(self, msg):
        print(msg['content']['data'])
dummy = DummyCommHandler()
R   R   R.   R    R'   R/   R5   R&   t   texts   open
R2   s   message
R4   R3   R0   s   close
N(	   R   R   R   R   R6   t   new_commt   sendR4   R0   (   R   R   R   R   R+   (    (    s9   lib/python2.7/site-packages/qtconsole/tests/test_comms.pyt   test_frontend_to_kernelp   s*    		(   R,   R-   R   R   R   R9   R=   (    (    (    s9   lib/python2.7/site-packages/qtconsole/tests/test_comms.pyR      s
   			5t   __main__(   R   t   syst   unittestt    jupyter_client.blocking.channelsR    t   qtconsole.managerR   t   versiont   PY2t   RuntimeErrorR   t   TestCaseR   R,   t   main(    (    (    s9   lib/python2.7/site-packages/qtconsole/tests/test_comms.pyt   <module>   s   	‹