B
    F.\&                 @   sf  d dl mZmZmZ d dlmZmZ d dlZd dlZd dl	Z	d dl
Z
d dlZd dlmZmZ d dlmZ d dlmZ ddlmZ dd	lmZ dd
lmZ ddlmZmZ ddlmZmZmZm Z  e!e"Z#eddZ$G dd de%Z&e& Z'dd Z(G dd de)Z*G dd de%Z+e% Z,G dd deZ-G dd deZ.G dd deZ/G dd deZ0e0 ed < dS )!    )print_functiondivisionabsolute_import)deque
namedtupleN)genlocks)Future)IOLoop   )finalize)nested_deserialize)get_ip   )Backendbackends)Comm	ConnectorListenerCommClosedErrorConnectionRequest)c2s_qs2c_qc_loopc_addr
conn_eventc               @   s@   e Zd ZdZdd Zdd Zdd Zdd	 Zd
d Zdd Z	dS )Managerz?
    An object coordinating listeners and their addresses.
    c             C   s,   t  | _td| _t | _t	 | _
d S )Nr   )weakrefWeakValueDictionary	listeners	itertoolscountaddr_suffixesr   ip	threadingZLocklock)self r'   6lib/python3.7/site-packages/distributed/comm/inproc.py__init__"   s    
zManager.__init__c          	   C   s8   | j ( || jkr td|f || j|< W d Q R X d S )Nzalready listening on %r)r%   r   RuntimeError)r&   addrlistenerr'   r'   r(   add_listener(   s    
zManager.add_listenerc          	   C   s8   | j ( y| j|= W n tk
r(   Y nX W d Q R X d S )N)r%   r   KeyError)r&   r+   r'   r'   r(   remove_listener.   s
    zManager.remove_listenerc          	   C   s(   | j  | | | j|S Q R X d S )N)r%   validate_addressr   get)r&   r+   r'   r'   r(   get_listener_for5   s    
zManager.get_listener_forc             C   s   d| j t t| jf S )Nz%s/%d/%s)r#   osgetpidnextr"   )r&   r'   r'   r(   new_address:   s    zManager.new_addressc             C   sF   | d\}}}|| jks*t|t krBtd|| jt f dS )z3
        Validate the address' IP and pid.
        /z6inproc address %r does not match host (%r) or pid (%r)N)splitr#   intr3   r4   
ValueError)r&   r+   r#   pidsuffixr'   r'   r(   r0   =   s    zManager.validate_addressN)
__name__
__module____qualname____doc__r)   r-   r/   r2   r6   r0   r'   r'   r'   r(   r      s   r   c               C   s   dt   S )z!
    Generate a new address.
    z	inproc://)global_managerr6   r'   r'   r'   r(   r6   J   s    r6   c               @   s   e Zd ZdS )
QueueEmptyN)r=   r>   r?   r'   r'   r'   r(   rB   Q   s   rB   c               @   sF   e Zd ZdZdd Zdd Zdd Zdd	 ZeZe	 Z
e
fd
dZdS )QueuezI
    A single-reader, single-writer, non-threadsafe, peekable queue.
    c             C   s   t  | _d | _d S )N)r   _q_read_future)r&   r'   r'   r(   r)   Z   s    zQueue.__init__c             C   s   | j }|st| S )N)rD   rB   popleft)r&   qr'   r'   r(   
get_nowait^   s    zQueue.get_nowaitc             C   s8   | j rtdt }| j}|r.||  n|| _ |S )NzOnly one reader allowed)rE   AssertionErrorr	   rD   
set_resultrF   )r&   futrG   r'   r'   r(   r1   d   s    z	Queue.getc             C   sD   | j }| j}|d k	r6t|dks$td | _|| n
|| d S )Nr   )rD   rE   lenrI   rJ   append)r&   valuerG   rK   r'   r'   r(   
put_nowaitn   s    zQueue.put_nowaitc             C   s(   | j }|r|d S || jk	r |S tdS )zV
        Get the next object in the queue without removing it from the queue.
        r   N)rD   _omittedrB   )r&   defaultrG   r'   r'   r(   peek|   s    
z
Queue.peekN)r=   r>   r?   r@   r)   rH   r1   rO   ZputobjectrP   rR   r'   r'   r'   r(   rC   U   s   

rC   c               @   s|   e Zd ZdZdZdddZdd Zedd	 Zed
d Z	e
jdddZe
jdddZe
jdd Zdd Zdd ZdS )InProcz
    An established communication based on a pair of in-process queues.

    Reminder: a Comm must always be used from a single thread.
    Its peer Comm can be running in any thread.
    FTc             C   sL   || _ || _|| _|| _|| _|| _d| _t| |  | _	d| j	_
d| _d S )NFT)_local_addr
_peer_addrdeserialize_read_q_write_q_write_loop_closedr   _get_finalizer
_finalizeratexit_initialized)r&   
local_addr	peer_addrread_qwrite_q
write_looprW   r'   r'   r(   r)      s    zInProc.__init__c             C   s   | j | jt| fdd}|S )Nc             S   s"   t d|f  || jt d S )NzClosing dangling queue in %s)loggerZwarningadd_callbackrO   _EOF)rc   rd   rr'   r'   r(   r      s    z'InProc._get_finalizer.<locals>.finalize)rY   rZ   repr)r&   r   r'   r'   r(   r\      s    zInProc._get_finalizerc             C   s   | j S )N)rU   )r&   r'   r'   r(   local_address   s    zInProc.local_addressc             C   s   | j S )N)rV   )r&   r'   r'   r(   peer_address   s    zInProc.peer_addressignoredc             c   sN   | j r
t| j V }|tkr2d| _ | j  t| jr@t|}t	
|d S )NT)r[   r   rX   r1   rg   r]   detachrW   r   r   Return)r&   Zdeserializersmsgr'   r'   r(   read   s    
zInProc.readNc             C   s,   |   rt| j| jj| tdd S )Nr   )closedr   rZ   rf   rY   rO   r   rn   )r&   ro   ZserializersZon_errorr'   r'   r(   write   s    zInProc.writec             C   s   |    d S )N)abort)r&   r'   r'   r(   close   s    zInProc.closec             C   sF   |   sB| j| jjt | jt d  | _| _d| _| j	  d S )NT)
rq   rZ   rf   rY   rO   rg   rX   r[   r]   rm   )r&   r'   r'   r(   rs      s    zInProc.abortc             C   s<   | j r
dS | jr4| jdtkr4d| _ | j  dS dS dS )z
        Whether this comm is closed.  An InProc comm is closed if:
            1) close() or abort() was called on this comm
            2) close() or abort() was called on the other end and the
               read queue is empty
        TNF)r[   r_   rX   rR   rg   r]   rm   )r&   r'   r'   r(   rq      s    
zInProc.closed)T)rl   )NN)r=   r>   r?   r@   r_   r)   r\   propertyrj   rk   r   	coroutinerp   rr   rt   rs   rq   r'   r'   r'   r(   rT      s   
		rT   c               @   sX   e Zd ZdZdddZejdd Zdd Zd	d
 Z	dd Z
edd Zedd ZdS )InProcListenerinprocTc             C   s.   t | _|p| j | _|| _|| _t | _d S )N)rA   managerr6   addresscomm_handlerrW   rC   listen_q)r&   rz   r{   rW   r'   r'   r(   r)      s
    zInProcListener.__init__c             c   sb   x\| j  V }|d krP td| j d|j |j|j|j| jd}|j	|j
j | | qW d S )Nz	inproc://)r`   ra   rb   rc   rd   rW   )r|   r1   rT   rz   r   r   r   r   rW   rf   r   setr{   )r&   conn_reqcommr'   r'   r(   _listen   s    

zInProcListener._listenc             C   s   | j | jj| d S )N)looprf   r|   rO   )r&   r~   r'   r'   r(   connect_threadsafe  s    z!InProcListener.connect_threadsafec             C   s,   t  | _| j| j | j| j|  d S )N)r
   currentr   rf   r   ry   r-   rz   )r&   r'   r'   r(   start  s    
zInProcListener.startc             C   s   | j d  | j| j d S )N)r|   rO   ry   r/   rz   )r&   r'   r'   r(   stop  s    zInProcListener.stopc             C   s
   d| j  S )Nz	inproc://)rz   )r&   r'   r'   r(   listen_address  s    zInProcListener.listen_addressc             C   s
   d| j  S )Nz	inproc://)rz   )r&   r'   r'   r(   contact_address  s    zInProcListener.contact_addressN)T)r=   r>   r?   prefixr)   r   rv   r   r   r   r   ru   r   r   r'   r'   r'   r(   rw      s   
rw   c               @   s$   e Zd Zdd ZejdddZdS )InProcConnectorc             C   s
   || _ d S )N)ry   )r&   ry   r'   r'   r(   r)     s    zInProcConnector.__init__Tc             k   s   | j |}|d kr"td|f tt t t | j  t	 d}|
| |j V  td|j d| |j|j|j|d}t|d S )Nz!no endpoint for inproc address %r)r   r   r   r   r   z	inproc://)r`   ra   rb   rc   rd   rW   )ry   r2   IOErrorr   rC   r
   r   r6   r   ZEventr   r   waitrT   r   r   r   r   r   rn   )r&   rz   rW   connection_argsr,   r~   r   r'   r'   r(   connect  s"    

zInProcConnector.connectN)T)r=   r>   r?   r)   r   rv   r   r'   r'   r'   r(   r     s   r   c               @   s8   e Zd ZeZdd Zdd Zdd Zdd Zd	d
 Z	dS )InProcBackendc             C   s
   t | jS )N)r   ry   )r&   r'   r'   r(   get_connector?  s    zInProcBackend.get_connectorc             K   s   t |||S )N)rw   )r&   locZhandle_commrW   r   r'   r'   r(   get_listenerB  s    zInProcBackend.get_listenerc             C   s   | j | | j jS )N)ry   r0   r#   )r&   r   r'   r'   r(   get_address_hostG  s    zInProcBackend.get_address_hostc             C   s   |S )Nr'   )r&   r   r'   r'   r(   resolve_addressK  s    zInProcBackend.resolve_addressc             C   s   | j | | j  S )N)ry   r0   r6   )r&   r   r'   r'   r(   get_local_address_forN  s    z#InProcBackend.get_local_address_forN)
r=   r>   r?   rA   ry   r   r   r   r   r   r'   r'   r'   r(   r   :  s   r   rx   )1Z
__future__r   r   r   collectionsr   r   r    Zloggingr3   r$   r   Ztornador   r   Ztornado.concurrentr	   Ztornado.ioloopr
   Zcompatibilityr   Zprotocolr   Zutilsr   registryr   r   Zcorer   r   r   r   Z	getLoggerr=   re   r   rS   r   rA   r6   	ExceptionrB   rC   rg   rT   rw   r   r   r'   r'   r'   r(   <module>   s8   
*4_/ 