B
    \                 @   s   d Z ddlmZ ddlZddlZddlZddlmZmZ ddl	Z	ddl
mZ ddlmZmZ ddlmZmZ ddlmZ dd	lmZ G d
d deZG dd deZG dd deZdS )zc Defines a KernelClient that provides thread-safe sockets with async callbacks on message replies.
    )absolute_importN)ThreadEvent)ZMQError)ioloop	zmqstream)TypeInstance)	HBChannel)KernelClientc                   s   e Zd ZdZdZdZdZdZdZ fddZ	dZ
dd Zdd	 Zd
d Zdd Zdd Zdd Zdd Zdd ZdddZdd Z  ZS )ThreadedZMQSocketChannelz.A ZMQ socket invoking a callback in the ioloopNc                sL   t t  |_|_|_t   fdd}j|    dS )a'  Create a channel.

        Parameters
        ----------
        socket : :class:`zmq.Socket`
            The ZMQ socket to use.
        session : :class:`session.Session`
            The session to use.
        loop
            A pyzmq ioloop to connect the socket to using a ZMQStream
        c                  s,   t jj_jj    d S )N)r   Z	ZMQStreamsocketr   streamZon_recv_handle_recvset )evtselfr   6lib/python3.7/site-packages/jupyter_client/threaded.pysetup_stream/   s    z7ThreadedZMQSocketChannel.__init__.<locals>.setup_streamN)	superr   __init__r   sessionr   r   add_callbackwait)r   r   r   Zloopr   )	__class__)r   r   r   r      s    z!ThreadedZMQSocketChannel.__init__Fc             C   s   | j S )N)	_is_alive)r   r   r   r   is_alive8   s    z!ThreadedZMQSocketChannel.is_alivec             C   s
   d| _ d S )NT)r   )r   r   r   r   start;   s    zThreadedZMQSocketChannel.startc             C   s
   d| _ d S )NF)r   )r   r   r   r   stop>   s    zThreadedZMQSocketChannel.stopc             C   s<   | j d k	r8y| j jdd W n tk
r0   Y nX d | _ d S )Nr   )Zlinger)r   close	Exception)r   r   r   r   r    A   s    
zThreadedZMQSocketChannel.closec                s    fdd}j | dS )zQueue a message to be sent from the IOLoop's thread.

        Parameters
        ----------
        msg : message to send

        This is threadsafe, as it uses IOLoop.add_callback to give the loop's
        thread control of the action.
        c                  s   j j  d S )N)r   sendr   r   )msgr   r   r   thread_sendS   s    z2ThreadedZMQSocketChannel.send.<locals>.thread_sendN)r   r   )r   r#   r$   r   )r#   r   r   r"   I   s    
zThreadedZMQSocketChannel.sendc             C   s:   | j |\}}| j |}| jr,| | | | dS )z[Callback for stream.on_recv.

        Unpacks message, and calls handlers with it.
        N)r   Zfeed_identitiesZdeserialize_inspectcall_handlers)r   r#   ZidentZsmsgr   r   r   r   W   s
    
z%ThreadedZMQSocketChannel._handle_recvc             C   s   dS )ai  This method is called in the ioloop thread when a message arrives.

        Subclasses should override this method to handle incoming messages.
        It is important to remember that this method is called in the thread
        so that some logic must be done to ensure that the application level
        handlers are called in the application thread.
        Nr   )r   r#   r   r   r   r&   c   s    z&ThreadedZMQSocketChannel.call_handlersc             C   s   dS )zaSubclasses should override this with a method
        processing any pending GUI events.
        Nr   )r   r   r   r   process_eventsm   s    z'ThreadedZMQSocketChannel.process_events      ?c             C   sX   t   | }xFtdD ]:}d| _| j| j x | jsNt   |k rNt d q0W qW dS )a  Immediately processes all pending messages on this channel.

        This is only used for the IOPub channel.

        Callers should use this method to ensure that :meth:`call_handlers`
        has been called for all messages that have been received on the
        0MQ SUB socket of this channel.

        This method is thread safe.

        Parameters
        ----------
        timeout : float, optional
            The maximum amount of time to spend flushing, in seconds. The
            default is one second.
           Fg{Gz?N)timerange_flushedr   r   _flushZsleep)r   ZtimeoutZ	stop_timeir   r   r   flusht   s    zThreadedZMQSocketChannel.flushc             C   s   | j   d| _dS )z"Callback for :method:`self.flush`.TN)r   r/   r,   )r   r   r   r   r-      s    
zThreadedZMQSocketChannel._flush)r(   )__name__
__module____qualname____doc__r   r   r   r   r%   r   r   r   r   r   r    r"   r   r&   r'   r/   r-   __classcell__r   r   )r   r   r      s$   

r   c                   sZ   e Zd ZdZdZdZ fddZeej	dd Z
dd	 Zd
d Zdd Zdd Z  ZS )IOLoopThreadz@Run a pyzmq ioloop in a thread to send and receive messages
    FNc                s   t t|   d| _d S )NT)r   r5   r   Zdaemon)r   )r   r   r   r      s    zIOLoopThread.__init__c               C   s   t d k	rdt _d S )NT)r5   _exitingr   r   r   r   _notice_exit   s    zIOLoopThread._notice_exitc             C   s    t  | _t|  | j  dS )z{Start the IOLoop thread

        Don't return until self.ioloop is defined,
        which is created in the thread
        N)r   _start_eventr   r   r   )r   r   r   r   r      s    
zIOLoopThread.startc          
   C   s   dt jkr ddl}||  t | _| j  xhy| j	  W nR t
k
rx } z|jtjkrfw6n W dd}~X Y q6 tk
r   | jrP n Y q6X P q6W dS )z0Run my loop, ignoring EINTR events in the pollerasyncior   N)sysmodulesr9   Zset_event_loopZnew_event_loopr   ZIOLoopr8   r   r   r   errnoZEINTRr!   r6   )r   r9   er   r   r   run   s"    


zIOLoopThread.runc             C   s4   | j dk	r| j | j j |   |   d| _ dS )zStop the channel's event loop and join its thread.

        This calls :meth:`~threading.Thread.join` and returns when the thread
        terminates. :class:`RuntimeError` will be raised if
        :meth:`~threading.Thread.start` is called again.
        N)r   r   r   joinr    )r   r   r   r   r      s
    
zIOLoopThread.stopc             C   s6   | j d k	r2y| j jdd W n tk
r0   Y nX d S )NT)Zall_fds)r   r    r!   )r   r   r   r   r       s
    
zIOLoopThread.close)r0   r1   r2   r3   r6   r   r   staticmethodatexitregisterr7   r   r>   r   r    r4   r   r   )r   r   r5      s   
r5   c                   sn   e Zd ZdZedd ZeeddZd fdd	Z	dd	 Z
 fd
dZeeZeeZeeZeeZ  ZS )ThreadedKernelClientz_ A KernelClient that provides thread-safe sockets with async callbacks on message replies.
    c             C   s   | j jS )N)ioloop_threadr   )r   r   r   r   r      s    zThreadedKernelClient.ioloopT)Z
allow_nonec                s:   t  | _| j  |r | j| j_tt| |||| d S )N)	r5   rD   r   _check_kernel_info_replyshell_channelr%   r   rC   start_channels)r   shellZiopubstdinZhb)r   r   r   rG      s
    

z#ThreadedKernelClient.start_channelsc             C   s"   |d dkr|  | d| j_dS )zPThis is run in the ioloop thread when the kernel info reply is received
        Zmsg_typeZkernel_info_replyN)Z_handle_kernel_info_replyrF   r%   )r   r#   r   r   r   rE      s    
z-ThreadedKernelClient._check_kernel_info_replyc                s&   t t|   | j r"| j  d S )N)r   rC   stop_channelsrD   r   r   )r   )r   r   r   rJ      s    
z"ThreadedKernelClient.stop_channels)TTTT)r0   r1   r2   r3   propertyr   r	   r5   rD   rG   rE   rJ   r   r   Ziopub_channel_classZshell_channel_classZstdin_channel_classr
   Zhb_channel_classr4   r   r   )r   r   rC      s   	rC   )r3   Z
__future__r   rA   r<   r:   Z	threadingr   r   r*   Zzmqr   Zzmq.eventloopr   r   Z	traitletsr   r	   Zjupyter_client.channelsr
   Zjupyter_clientr   objectr   r5   rC   r   r   r   r   <module>   s    K