B
    U²È[W;  ã               @   s  d Z ddlmZ ddlZddlmZ ddlmZ yddlm	Z
 W n  ek
r`   ddlm	Z
 Y nX ddlZddlZddlZddlZddlmZmZ ddlZddlmZ dd	lmZ dd
lmZ ddlmZ ddlmZ dZdZ G dd„ de!ƒZ"G dd„ de!ƒZ#G dd„ deƒZ$dS )z.Wrappers for forwarding stdout/stderr over zmqé    )Úprint_functionN)Úb2a_hex)Údeque)Ú	lock_held)ÚStringIOÚ
TextIOBase)ÚIOLoop)Ú	ZMQStream)Úextract_header)Ú	py3compat)Úunicode_typeé   c               @   s¢   e Zd ZdZd&dd„Zdd„ Zdd„ Zed	d
„ ƒZdd„ Z	dd„ Z
dd„ Zdd„ Zdd„ Zdd„ Zdd„ Zdd„ Zdd„ Zedd„ ƒZdd „ Zd!d"„ Zd#d$„ Zd%S )'ÚIOPubThreada   An object for sending IOPub messages in a background thread

    Prevents a blocking main thread from delaying output from threads.

    IOPubThread(pub_socket).background_socket is a Socket-API-providing object
    whose IO is always run in a thread.
    Fc             C   sn   || _ t| ƒ| _t ¡ | _|| _tdd| _|r8|  	¡  t
 ¡ | _tƒ | _|  ¡  t
j| jd| _d| j_dS )a  Create IOPub thread

        Parameters
        ----------

        socket: zmq.PUB Socket
            the socket on which messages will be sent.
        pipe: bool
            Whether this process should listen for IOPub messages
            piped from subprocesses.
        F)Úmake_current)ÚtargetTN)ÚsocketÚBackgroundSocketZbackground_socketÚosÚgetpidÚ_master_pidÚ
_pipe_flagr   Úio_loopÚ_setup_pipe_inÚ	threadingZlocalÚ_localr   Ú_eventsÚ_setup_event_pipeZThreadÚ_thread_mainÚthreadZdaemon)Úselfr   Úpipe© r!   ú1lib/python3.7/site-packages/ipykernel/iostream.pyÚ__init__2   s    


zIOPubThread.__init__c             C   s&   | j  ¡  | j  ¡  | j jdd dS )z.The inner loop that's actually run in a threadT)Zall_fdsN)r   r   ÚstartÚclose)r   r!   r!   r"   r   K   s    

zIOPubThread._thread_mainc             C   sf   | j j}|  tj¡}d|_tt d¡ƒ d¡}d|  }| _	| 
|¡ t|| jƒ| _| j | j¡ dS )zLCreate the PULL socket listening for events that should fire in this thread.r   é   Úasciizinproc://%sN)r   ÚcontextÚzmqÚPULLÚlingerr   r   ÚurandomÚdecodeÚ_event_interfaceZbindr	   r   Z_event_pullerÚon_recvÚ_handle_event)r   ÚctxÚpipe_inZ_uuidZifacer!   r!   r"   r   Q   s    
zIOPubThread._setup_event_pipec             C   sT   y| j j}W nB tk
rN   | jj}| tj¡}d|_| | j	¡ || j _Y nX |S )zSthread-local event pipe for signaling events that should be processed in the threadr   )
r   Ú
event_pipeÚAttributeErrorr   r(   r)   ÚPUSHr+   Úconnectr.   )r   r3   r1   r!   r!   r"   Ú_event_pipe]   s    zIOPubThread._event_pipec             C   s0   t | jƒ}x t|ƒD ]}| j ¡ }|ƒ  qW dS )zÅHandle an event on the event pipe

        Content of the message is ignored.

        Whenever *an* event arrives on the event stream,
        *all* waiting events are processed in order.
        N)Úlenr   ÚrangeÚpopleft)r   ÚmsgZn_eventsÚiZevent_fr!   r!   r"   r0   k   s    


zIOPubThread._handle_eventc          
   C   sœ   | j j}t d¡| _|  tj¡}d|_y| d¡| _	W nD tj
k
rz } z$t d| d ¡ d| _| ¡  dS d}~X Y nX t|| jƒ| _| j | j¡ dS )z7setup listening pipe for IOPub from forked subprocessesr&   r   ztcp://127.0.0.1z)Couldn't bind IOPub Pipe to 127.0.0.1: %sz'
subprocess output will be unavailable.FN)r   r(   r   r,   Ú
_pipe_uuidr)   r*   r+   Zbind_to_random_portÚ
_pipe_portZZMQErrorÚwarningsÚwarnr   r%   r	   r   Z_pipe_inr/   Ú_handle_pipe_msg)r   r1   r2   Úer!   r!   r"   r   z   s    
zIOPubThread._setup_pipe_inc             C   sJ   | j r|  ¡ sdS |d | jkr4td|tjd dS |  |dd… ¡ dS )z'handle a pipe message from a subprocessNr   zBad pipe message: %s)Úfiler   )r   Ú_is_master_processr=   ÚprintÚsysÚ
__stderr__Úsend_multipart)r   r;   r!   r!   r"   rA      s    zIOPubThread._handle_pipe_msgc             C   s2   t  ¡ }| t j¡}d|_| d| j ¡ ||fS )Ni¸  ztcp://127.0.0.1:%i)r)   ZContextr   r5   r+   r6   r>   )r   r1   Úpipe_outr!   r!   r"   Ú_setup_pipe_out™   s
    zIOPubThread._setup_pipe_outc             C   s   t  ¡ | jkS )N)r   r   r   )r   r!   r!   r"   rD   ¡   s    zIOPubThread._is_master_processc             C   s   | j r|  ¡ rtS tS dS )z8check for forks, and switch to zmq pipeline if necessaryN)r   rD   ÚMASTERÚCHILD)r   r!   r!   r"   Ú_check_mp_mode¤   s    zIOPubThread._check_mp_modec             C   s   | j  ¡  t | j¡ dS )zStart the IOPub threadN)r   r$   ÚatexitÚregisterÚstop)r   r!   r!   r"   r$   «   s    
zIOPubThread.startc             C   sD   | j  ¡ sdS | j | jj¡ | j  ¡  t| jdƒr@| jj 	¡  dS )zStop the IOPub threadNr3   )
r   Úis_aliver   Zadd_callbackrP   ÚjoinÚhasattrr   r3   r%   )r   r!   r!   r"   rP   ²   s    

zIOPubThread.stopc             C   s   | j  ¡  d | _ d S )N)r   r%   )r   r!   r!   r"   r%   »   s    
zIOPubThread.closec             C   s
   | j d kS )N)r   )r   r!   r!   r"   Úclosed¿   s    zIOPubThread.closedc             C   s.   | j  ¡ r$| j |¡ | j d¡ n|ƒ  dS )ztSchedule a function to be called in our IO thread.

        If the thread is not running, call immediately.
        ó    N)r   rQ   r   Úappendr7   Úsend)r   Úfr!   r!   r"   ÚscheduleÃ   s    
zIOPubThread.schedulec                s   ˆ  ‡ ‡‡fdd„¡ dS )z”send_multipart schedules actual zmq send in my thread.
        
        If my thread isn't running (e.g. forked process), send immediately.
        c                  s   ˆj ˆ ˆŽS )N)Ú_really_sendr!   )ÚargsÚkwargsr   r!   r"   Ú<lambda>Ô   s    z,IOPubThread.send_multipart.<locals>.<lambda>N)rY   )r   r[   r\   r!   )r[   r\   r   r"   rH   Ï   s    zIOPubThread.send_multipartc             O   s`   |   ¡ }|tkr&| jj|f|ž|Ž n6|  ¡ \}}|j| jg| f|ž|Ž | ¡  | ¡  dS )z)The callback that actually sends messagesN)rM   rL   r   rH   rJ   r=   r%   Zterm)r   r;   r[   r\   Zmp_moder1   rI   r!   r!   r"   rZ   Ö   s    zIOPubThread._really_sendN)F)Ú__name__Ú
__module__Ú__qualname__Ú__doc__r#   r   r   Úpropertyr7   r0   r   rA   rJ   rD   rM   r$   rP   r%   rT   rY   rH   rZ   r!   r!   r!   r"   r   )   s$   
		r   c                   sH   e Zd ZdZdZdd„ Z‡ fdd„Z‡ fdd„Zd	d
„ Zdd„ Z	‡  Z
S )r   z>Wrapper around IOPub thread that provides zmq send[_multipart]Nc             C   s
   || _ d S )N)Ú	io_thread)r   rc   r!   r!   r"   r#   ë   s    zBackgroundSocket.__init__c                sh   |  d¡r$| d¡r$tt| ƒ |¡ t| jj|ƒrTtj	d| t
dd t| jj|ƒS tt| ƒ |¡ dS )z2Wrap socket attr access for backward-compatibilityÚ__z5Accessing zmq Socket attribute %s on BackgroundSocketé   )Ú
stacklevelN)Ú
startswithÚendswithÚsuperr   Ú__getattr__rS   rc   r   r?   r@   ÚDeprecationWarningÚgetattr)r   Úattr)Ú	__class__r!   r"   rj   î   s    

zBackgroundSocket.__getattr__c                sX   |dks|  do| d¡¡r0tt| ƒ ||¡ n$tjd| tdd t| j	j
||ƒ d S )Nrc   rd   z3Setting zmq Socket attribute %s on BackgroundSocketre   )rf   )rg   rh   ri   r   Ú__setattr__r?   r@   rk   Úsetattrrc   r   )r   rm   Úvalue)rn   r!   r"   ro   ù   s
    

zBackgroundSocket.__setattr__c             O   s   | j |gf|ž|ŽS )N)rH   )r   r;   r[   r\   r!   r!   r"   rW     s    zBackgroundSocket.sendc             O   s   | j j||ŽS )zSchedule send in IO thread)rc   rH   )r   r[   r\   r!   r!   r"   rH     s    zBackgroundSocket.send_multipart)r^   r_   r`   ra   rc   r#   rj   ro   rW   rH   Ú__classcell__r!   r!   )rn   r"   r   ç   s   r   c               @   sŽ   e Zd ZdZdZdZdZdZd dd„Zdd	„ Z	d
d„ Z
dd„ Zedd„ ƒZdd„ Zdd„ Zdd„ Zdd„ Zdd„ Zdd„ Zdd„ Zdd„ ZdS )!Ú	OutStreamztA file like object that publishes the stream to a 0MQ PUB socket.
    
    Output is handed off to an IO Thread
    é
   gš™™™™™É?NzUTF-8c             C   s¼   |d k	rt  dt¡ || _t|tƒsHt jd| tdd t|ƒ}| ¡  || _|| _dt	 
|¡ | _i | _t ¡ | _d| _|j| _|  ¡  d | _|r¸t|dƒr°t|dƒr°|| _ntd	ƒ‚d S )
Nz4pipe argument to OutStream is deprecated and ignoredz4OutStream should be created with IOPubThread, not %rre   )rf   s   stream.FÚreadÚwritez(echo argument must be a file like object)r?   r@   rk   ÚsessionÚ
isinstancer   r$   Ú
pub_threadÚnamer   Z
cast_bytesÚtopicÚparent_headerr   r   r   Ú_flush_pendingr   Ú_io_loopÚ_new_bufferÚechorS   Ú
ValueError)r   rw   ry   rz   r    r€   r!   r!   r"   r#     s,    



zOutStream.__init__c             C   s   t  ¡ | jkS )N)r   r   r   )r   r!   r!   r"   rD   3  s    zOutStream._is_master_processc             C   s   t |ƒ| _d S )N)r
   r|   )r   Úparentr!   r!   r"   Ú
set_parent6  s    zOutStream.set_parentc             C   s
   d | _ d S )N)ry   )r   r!   r!   r"   r%   9  s    zOutStream.closec             C   s
   | j d kS )N)ry   )r   r!   r!   r"   rT   <  s    zOutStream.closedc                s,   ˆ j r
dS dˆ _ ‡ fdd„}ˆ j |¡ dS )zuschedule a flush in the IO thread

        call this on write, to indicate that flush should be called soon.
        NTc                  s   ˆ j  ˆ jˆ j¡ d S )N)r~   Z
call_laterÚflush_intervalÚ_flushr!   )r   r!   r"   Ú_schedule_in_threadJ  s    z6OutStream._schedule_flush.<locals>._schedule_in_thread)r}   ry   rY   )r   r†   r!   )r   r"   Ú_schedule_flush@  s
    zOutStream._schedule_flushc             C   s^   | j j ¡ rR| j  | j¡ tƒ sZt ¡ }| j  |j¡ | 	| j
¡sZtdtjd n|  ¡  dS )zStrigger actual zmq send

        send will happen in the background thread
        zIOStream.flush timed out)rC   N)ry   r   rQ   rY   r…   Úimport_lock_heldr   ZEventÚsetÚwaitÚflush_timeoutrE   rF   rG   )r   Zevtr!   r!   r"   ÚflushN  s    zOutStream.flushc          
   C   s¦   d| _ | jdk	rby| j ¡  W nB tk
r` } z$| jtjk	rPtd |¡tjd W dd}~X Y nX |  ¡ }|r¢t	 
¡ | j_| j|dœ}| jj| jd|| j| jd dS )z³This is where the actual send happens.

        _flush should generally be called in the IO thread,
        unless the thread has been destroyed (e.g. forked subprocess).
        FNzFlush failed: {})rC   )rz   ÚtextÚstream)Úcontentr‚   Zident)r}   r€   rŒ   ÚOSErrorrF   rG   rE   ÚformatÚ_flush_bufferr   r   rw   Úpidrz   rW   ry   r|   r{   )r   rB   Údatar   r!   r!   r"   r…   d  s    

zOutStream._flushc          
      sÌ   ˆ j d k	r^yˆ j  ˆ¡ W nB tk
r\ } z$ˆ j tjk	rLtd |¡tjd W d d }~X Y nX ˆ jd krrtdƒ‚nVt	ˆt
ƒsŠˆ ˆ jd¡‰ˆ  ¡  }ˆ j ‡ ‡fdd„¡ |rÀdˆkrÈˆ  ¡  nˆ  ¡  d S )NzWrite failed: {})rC   zI/O operation on closed fileÚreplacec                  s   ˆ j  ˆ¡S )N)Ú_bufferrv   r!   )r   Ústringr!   r"   r]     s    z!OutStream.write.<locals>.<lambda>Ú
)r€   rv   r   rF   rG   rE   r‘   ry   r   rx   r   r-   ÚencodingrD   rY   rŒ   r‡   )r   r—   rB   Zis_childr!   )r   r—   r"   rv   ~  s"    






zOutStream.writec             C   s0   | j d krtdƒ‚nx|D ]}|  |¡ qW d S )NzI/O operation on closed file)ry   r   rv   )r   Zsequencer—   r!   r!   r"   Ú
writelinesš  s    


zOutStream.writelinesc             C   s   dS )NTr!   )r   r!   r!   r"   Úwritable¡  s    zOutStream.writablec             C   s0   d}| j dk	r,| j }|  ¡  | ¡ }| ¡  |S )zƒclear the current buffer and return the current buffer data.
        
        This should only be called in the IO thread.
        Ú N)r–   r   Úgetvaluer%   )r   r”   Zbufr!   r!   r"   r’   ¤  s    
zOutStream._flush_bufferc             C   s   t ƒ | _d S )N)r   r–   )r   r!   r!   r"   r   ±  s    zOutStream._new_buffer)NN)r^   r_   r`   ra   r‹   r„   r{   r™   r#   rD   rƒ   r%   rb   rT   r‡   rŒ   r…   rv   rš   r›   r’   r   r!   r!   r!   r"   rs   	  s$   
rs   )%ra   Z
__future__r   rN   Zbinasciir   Úcollectionsr   Ú	importlibr   rˆ   ÚImportErrorÚimpr   rF   r   r?   Úior   r   r)   Zzmq.eventloop.ioloopr   Zzmq.eventloop.zmqstreamr	   Zjupyter_client.sessionr
   Zipython_genutilsr   Zipython_genutils.py3compatr   rK   rL   Úobjectr   r   rs   r!   r!   r!   r"   Ú<module>   s2    ?"