B
    ^\xH  ã               @   sR  d dl mZmZmZ d dlZd dlZd dlZd dlZd dlZyd dl	Z	W n e
k
r`   dZ	Y nX d dlZd dlZd dlmZmZ d dlmZmZ d dlmZ d dlmZ ddlmZmZ dd	lmZ dd
lmZmZmZmZm Z m!Z!m"Z" ddl#m$Z$m%Z% ddl&m'Z'm(Z( ddl)m*Z*m+Z+m,Z,m-Z-m.Z. ddlm/Z/m0Z0m1Z1m2Z2 e 3e4¡Z5dd„ Z6e6ƒ Z7dd„ Z8dd„ Z9dd„ Z:G dd„ de*ƒZ;G dd„ de;ƒZ<dd„ Z=G dd„ de>ƒZ?G d d!„ d!e+e?ƒZ@G d"d#„ d#e@ƒZAG d$d%„ d%e@ƒZBG d&d'„ d'e,e?ƒZCG d(d)„ d)eCƒZDG d*d+„ d+eCƒZEG d,d-„ d-e$ƒZFG d.d/„ d/eFƒZGG d0d1„ d1eFƒZHeGƒ e%d2< eHƒ e%d3< dS )4é    )Úprint_functionÚdivisionÚabsolute_importN)ÚgenÚnetutil)ÚStreamClosedErrorÚIOStream)Ú	TCPClient)Ú	TCPServeré   )ÚfinalizeÚPY3)ÚThreadPoolExecutor)Úensure_bytesÚ	ensure_ipÚget_ipÚget_ipv6ÚnbytesÚparse_timedeltaÚshutting_downé   )ÚBackendÚbackends)Úparse_host_portÚunparse_host_port)ÚCommÚ	ConnectorÚListenerÚCommClosedErrorÚFatalCommClosedError)Ú	to_framesÚfrom_framesÚget_tcp_server_addressÚensure_concrete_hostc              C   s0   ydd l } |  ¡ jd S  tk
r*   dS X d S )Nr   r   g    eÍÝA)ÚpsutilZvirtual_memoryZtotalÚImportError)r$   © r&   ú3lib/python3.7/site-packages/distributed/comm/tcp.pyÚget_total_physical_memory$   s
    r(   c          
   C   sÐ  |   ¡ rdS tj d¡}tt|ddƒ}| j}d}||d ksFtdƒ‚td|d	 ƒ}td|| | ƒ}|||  }|d
ks~t‚yt	j
 d¡rºt d||¡ | tjd|d |d f¡ n¨| tjtjd¡ ytj}tj}tj}W n4 tk
r   t	j
dkrd}d}d}nd}Y nX |dk	rbt d|||¡ | tj||¡ | tj||¡ | tj||¡ t	j
 d¡r˜t d|d ¡ d}	| tj|	|d ¡ W n0 tk
rÊ }
 zt d|
¡ W dd}
~
X Y nX dS )z5
    Set kernel-level TCP timeout on the stream.
    Nzdistributed.comm.timeouts.tcpZseconds)Údefaulté
   r   zTimeout too lowr   é   r   Úwinz+Setting TCP keepalive: idle=%d, interval=%diè  Údarwiné   i  i  z7Setting TCP keepalive: nprobes=%d, idle=%d, interval=%dZlinuxzSetting TCP user timeout: %d msé   z'Could not set timeout on TCP stream: %s)ÚclosedÚdaskÚconfigÚgetÚintr   ÚsocketÚAssertionErrorÚmaxÚsysÚplatformÚ
startswithÚloggerÚdebugZioctlZSIO_KEEPALIVE_VALSZ
setsockoptZ
SOL_SOCKETZSO_KEEPALIVEÚTCP_KEEPIDLEÚTCP_KEEPINTVLÚTCP_KEEPCNTÚAttributeErrorZSOL_TCPÚEnvironmentErrorÚwarning)ÚstreamZtimeoutÚsockZnprobesZidleZintervalr=   r>   r?   ZTCP_USER_TIMEOUTÚer&   r&   r'   Úset_tcp_timeout/   sP    




rF   c             C   s<   |   ¡ rdS yt| j ¡ dd… Ž S  tk
r6   dS X dS )z'
    Get a stream's local address.
    z<closed>Nr   )r0   r   r5   ZgetsocknamerA   )rC   r&   r&   r'   Úget_stream_addressh   s    rG   c             C   sl   |j dk	rX|j }tr@t|tjƒr@d|jkr@td| |jj|f ƒ‚td| |jj|f ƒ‚ntd| |f ƒ‚dS )z8
    Re-raise StreamClosedError as CommClosedError.
    NZ
UNKNOWN_CAzin %s: %s: %sz	in %s: %s)	Ú
real_errorÚsslÚ
isinstanceZSSLErrorÚreasonr   Ú	__class__Ú__name__r   )ÚobjÚexcr&   r&   r'   Úconvert_stream_closed_errorv   s    

rP   c               @   s    e Zd ZdZejdkZeedƒZ	ddd„Z
dd„ Zd	d
„ Zedd„ ƒZedd„ ƒZejddd„ƒZejddd„ƒZejdd„ ƒZdd„ Zdd„ Zedd„ ƒZdS ) ÚTCPzO
    An established communication based on an underlying Tornado IOStream.
    )r+   é   Ú	read_intoTc             C   sT   || _ || _|| _|| _t| |  ¡ ƒ| _d| j_i | _| 	d¡ t
|ƒ |  ¡  d S )NFT)Ú_local_addrÚ
_peer_addrrC   Údeserializer   Ú_get_finalizerÚ
_finalizerÚatexitÚ_extraZset_nodelayrF   Ú_read_extra)ÚselfrC   Z
local_addrZ	peer_addrrV   r&   r&   r'   Ú__init__Ž   s    
zTCP.__init__c             C   s   d S )Nr&   )r\   r&   r&   r'   r[   ›   s    zTCP._read_extrac             C   s   | j t| ƒfdd„}|S )Nc             S   s$   |   ¡ s t d|f ¡ |  ¡  d S )NzClosing dangling stream in %s)r0   r;   rB   Úclose)rC   Úrr&   r&   r'   r   Ÿ   s    z$TCP._get_finalizer.<locals>.finalize)rC   Úrepr)r\   r   r&   r&   r'   rW   ž   s    zTCP._get_finalizerc             C   s   | j S )N)rT   )r\   r&   r&   r'   Úlocal_address¦   s    zTCP.local_addressc             C   s   | j S )N)rU   )r\   r&   r&   r'   Úpeer_addressª   s    zTCP.peer_addressNc          
   c   s>  | j }|d krt‚y¦| d¡V }t d|¡d }| d| ¡V }t d| |¡}g }x`|D ]X}|r¤tr–| jr–t|ƒ}| |¡V }||ks¢t	||fƒ‚q¨| |¡V }nd}| 
|¡ qZW W n8 tk
rð }	 zd | _ tƒ sàt| |	ƒ W d d }	~	X Y nJX yt|| j|dV }
W n& tk
r.   |  ¡  tdƒ‚Y nX t |
¡‚d S )Né   ÚQr   ó    )rV   Údeserializersz aborted stream on truncated data)rC   r   Z
read_bytesÚstructZunpackr   Ú_iostream_has_read_intoÚ	bytearrayrS   r6   Úappendr   r   rP   r!   rV   ÚEOFErrorÚabortr   ÚReturn)r\   rf   rC   Zn_framesÚlengthsÚframesÚlengthÚframeÚnrE   Úmsgr&   r&   r'   Úread®   s<    

zTCP.readÚmessagec          
   c   sl  | j }d}|d krt‚t|||| j| jdœdV }y°dd„ |D ƒ}t dt|ƒ¡gdd„ |D ƒ }trŒt	|ƒdk rŒd	 
|| ¡}	| |	¡ nT| d	 
|¡¡ xB|D ]:}
| js´t|
ƒ}
| |
¡}|t|
ƒ7 }|d
kr¢|V  d}q¢W W np tk
r } zd }t| |ƒ W d d }~X Y n@ tk
rR } z |jd kr@t d|¡ n‚ W d d }~X Y nX t t	tt|ƒƒ¡‚d S )Nr   )ZsenderZ	recipient)ÚserializersÚon_errorÚcontextc             S   s   g | ]}t |ƒ‘qS r&   )r   )Ú.0rq   r&   r&   r'   ú
<listcomp>ã   s    zTCP.write.<locals>.<listcomp>rd   c             S   s   g | ]}t  d |¡‘qS )rd   )rg   Úpack)ry   Úxr&   r&   r'   rz   å   s    i   re   g    €„~Az*tried to write message %s on closed stream)rC   r   r    rT   rU   rg   r{   Úlenr   ÚsumÚjoinÚwriteÚ_iostream_allows_memoryviewr   r   r   rP   Ú	TypeErrorZ_write_bufferr;   Úinfor   rm   Úmap)r\   rs   rv   rw   rC   Zbytes_since_last_yieldro   rn   Zlength_bytesÚbrq   ZfuturerE   r&   r&   r'   r€   Õ   sB    

z	TCP.writec             c   sx   | j d  }| _ |d k	rt| ¡ stz@y&| ¡ r6| d¡V  |j tj¡ W n tk
rZ   Y nX W d | j 	¡  | 
¡  X d S )Nre   )rC   r0   Zwritingr€   r5   ZshutdownZ	SHUT_RDWRrA   rX   Údetachr^   )r\   rC   r&   r&   r'   r^     s    

z	TCP.closec             C   s4   | j d  }| _ |d k	r0| ¡ s0| j ¡  | ¡  d S )N)rC   r0   rX   r†   r^   )r\   rC   r&   r&   r'   rl     s    
z	TCP.abortc             C   s   | j d kp| j  ¡ S )N)rC   r0   )r\   r&   r&   r'   r0     s    z
TCP.closedc             C   s   | j S )N)rZ   )r\   r&   r&   r'   Ú
extra_info  s    zTCP.extra_info)T)N)Nru   )rM   Ú
__module__Ú__qualname__Ú__doc__ÚtornadoÚversion_infor   Úhasattrr   rh   r]   r[   rW   Úpropertyra   rb   r   Ú	coroutinert   r€   r^   rl   r0   r‡   r&   r&   r&   r'   rQ   …   s    


&,rQ   c               @   s   e Zd ZdZdd„ ZdS )ÚTLSz(
    A TLS-specific version of TCP.
    c             C   sZ   t  | ¡ | jj}|d k	rV| jj| ¡ | ¡ d | jd \}}}t 	d| j
|||¡ d S )N)ZpeercertÚcipherr‘   z7TLS connection with %r: protocol=%s, cipher=%s, bits=%d)rQ   r[   rC   r5   rZ   ÚupdateZgetpeercertr‘   r;   r<   rU   )r\   rD   r‘   ÚprotoÚbitsr&   r&   r'   r[   $  s    
zTLS._read_extraN)rM   rˆ   r‰   rŠ   r[   r&   r&   r&   r'   r     s   r   c             C   s*   |   d¡}t|tjƒs&tdt|ƒ ƒ‚|S )NZssl_contextzsTLS expects a `ssl_context` argument of type ssl.SSLContext (perhaps check your TLS configuration?)  Instead got %s)r3   rJ   rI   Z
SSLContextr‚   Ústr)Úconnection_argsÚctxr&   r&   r'   Ú_expect_tls_context/  s
    
r˜   c               @   s   e Zd Zdd„ ZdS )ÚRequireEncryptionMixinc             C   s(   | j s$| d¡r$td| j| f ƒ‚d S )NZrequire_encryptionzLencryption required by Dask configuration, refusing communication from/to %r)Ú	encryptedr3   ÚRuntimeErrorÚprefix)r\   Úaddressr–   r&   r&   r'   Ú_check_encryption:  s    z(RequireEncryptionMixin._check_encryptionN)rM   rˆ   r‰   rž   r&   r&   r&   r'   r™   8  s   r™   c               @   sJ   e Zd Zer(edddZejdedZndZe	edZ
ejdd	d
„ƒZdS )ÚBaseTCPConnectorr   zTCP-Executor)Zthread_name_prefixF)Zclose_executorZexecutorN)ZresolverTc       
   
   k   s¸   |   ||¡ t|ƒ\}}| jf |Ž}y8tjj||fdti|—ŽV }| ¡ rZ|jrZt	|jƒ‚W n, t	k
rˆ } zt
| |ƒ W d d }~X Y nX | jt|ƒ }	t |  ||	| j| |¡¡‚d S )NÚmax_buffer_size)rž   r   Ú_get_connect_argsrŸ   ÚclientÚconnectÚMAX_BUFFER_SIZEr0   Úerrorr   rP   rœ   rG   r   rm   Ú
comm_class)
r\   r   rV   r–   ÚipÚportÚkwargsrC   rE   ra   r&   r&   r'   r£   K  s     

zBaseTCPConnector.connect)T)rM   rˆ   r‰   r   r   Z	_executorr   ZExecutorResolverZ	_resolverr	   r¢   r   r   r£   r&   r&   r&   r'   rŸ   B  s   

rŸ   c               @   s    e Zd ZdZeZdZdd„ ZdS )ÚTCPConnectorztcp://Fc             K   s   i S )Nr&   )r\   r–   r&   r&   r'   r¡   m  s    zTCPConnector._get_connect_argsN)rM   rˆ   r‰   rœ   rQ   r¦   rš   r¡   r&   r&   r&   r'   rª   h  s   rª   c               @   s    e Zd ZdZeZdZdd„ ZdS )ÚTLSConnectorztls://Tc             K   s   t |ƒ}d|iS )NÚssl_options)r˜   )r\   r–   r—   r&   r&   r'   r¡   v  s    zTLSConnector._get_connect_argsN)rM   rˆ   r‰   rœ   r   r¦   rš   r¡   r&   r&   r&   r'   r«   q  s   r«   c               @   s\   e Zd Zddd„Zdd„ Zdd„ Zd	d
„ Zejdd„ ƒZ	dd„ Z
edd„ ƒZedd„ ƒZdS )ÚBaseTCPListenerTr   c             K   sH   |   ||¡ t||ƒ\| _| _|| _|| _| jf |Ž| _d | _d | _	d S )N)
rž   r   r§   r¨   Úcomm_handlerrV   Ú_get_server_argsÚserver_argsÚ
tcp_serverÚbound_address)r\   r   r®   rV   Zdefault_portr–   r&   r&   r'   r]   }  s    zBaseTCPListener.__init__c             C   s°   t f dti| j—Ž| _| j| j_ttj 	d¡ƒ}xzt
dƒD ]j}ytj| j| j|d}W n> tk
r” } z | jdks~|jtjkr€‚ |}W d d }~X Y q:X | j |¡ P q:W |‚d S )Nr    zdistributed.comm.socket-backlogrR   )r   Úbacklogr   )r
   r¤   r°   r±   Ú_handle_streamZhandle_streamr4   r1   r2   r3   Úranger   Zbind_socketsr¨   r§   rA   ÚerrnoZ
EADDRINUSEZadd_sockets)r\   r³   ÚiZsocketsrE   rO   r&   r&   r'   Ústart‡  s    

zBaseTCPListener.startc             C   s"   | j d  }| _ |d k	r| ¡  d S )N)r±   Ústop)r\   r±   r&   r&   r'   r¹      s    zBaseTCPListener.stopc             C   s   | j d krtdƒ‚d S )Nz,invalid operation on non-started TCPListener)r±   Ú
ValueError)r\   r&   r&   r'   Ú_check_started¥  s    
zBaseTCPListener._check_startedc             c   sn   | j t|d d… Ž  }|  ||¡V }|d kr0d S t d|| j¡ | j t|ƒ }|  |||| j¡}|  	|¡ d S )Nr   z!Incoming connection from %r to %r)
rœ   r   Ú_prepare_streamr;   r<   Úcontact_addressrG   r¦   rV   r®   )r\   rC   r   ra   Zcommr&   r&   r'   r´   ©  s    
zBaseTCPListener._handle_streamc             C   s,   |   ¡  | jdkrt| jƒ| _| jdd… S )z@
        The listening address as a (host, port) tuple.
        Nr   )r»   r²   r"   r±   )r\   r&   r&   r'   Úget_host_port¶  s    
zBaseTCPListener.get_host_portc             C   s   | j t|  ¡ Ž  S )z4
        The listening address as a string.
        )rœ   r   r¾   )r\   r&   r&   r'   Úlisten_addressÁ  s    zBaseTCPListener.listen_addressc             C   s$   |   ¡ \}}t|ƒ}| jt||ƒ S )z2
        The contact address as a string.
        )r¾   r#   rœ   r   )r\   Úhostr¨   r&   r&   r'   r½   È  s    zBaseTCPListener.contact_addressN)Tr   )rM   rˆ   r‰   r]   r¸   r¹   r»   r   r   r´   r¾   rŽ   r¿   r½   r&   r&   r&   r'   r­   {  s    
	r­   c               @   s.   e Zd ZdZeZdZdd„ Zej	dd„ ƒZ
dS )ÚTCPListenerztcp://Fc             K   s   i S )Nr&   )r\   r–   r&   r&   r'   r¯   ×  s    zTCPListener._get_server_argsc             C   s   t  |¡‚d S )N)r   rm   )r\   rC   r   r&   r&   r'   r¼   Ú  s    zTCPListener._prepare_streamN)rM   rˆ   r‰   rœ   rQ   r¦   rš   r¯   r   r   r¼   r&   r&   r&   r'   rÁ   Ò  s
   rÁ   c               @   s.   e Zd ZdZeZdZdd„ Zej	dd„ ƒZ
dS )ÚTLSListenerztls://Tc             K   s   t |ƒ}d|iS )Nr¬   )r˜   )r\   r–   r—   r&   r&   r'   r¯   ä  s    zTLSListener._get_server_argsc             c   s^   y|  ¡ V  W n@ tk
rN } z"t d| j|t|dd ƒp:|¡ W d d }~X Y nX t |¡‚d S )Nz7Listener on %r: TLS handshake failed with remote %r: %srH   )Zwait_for_handshakerA   r;   rB   r¿   Úgetattrr   rm   )r\   rC   r   rE   r&   r&   r'   r¼   è  s    $zTLSListener._prepare_streamN)rM   rˆ   r‰   rœ   r   r¦   rš   r¯   r   r   r¼   r&   r&   r&   r'   rÂ   ß  s
   rÂ   c               @   s<   e Zd Zdd„ Zdd„ Zdd„ Zdd„ Zd	d
„ Zdd„ ZdS )ÚBaseTCPBackendc             C   s   |   ¡ S )N)Ú_connector_class)r\   r&   r&   r'   Úget_connectorù  s    zBaseTCPBackend.get_connectorc             K   s   | j |||f|ŽS )N)Ú_listener_class)r\   ÚlocZhandle_commrV   r–   r&   r&   r'   Úget_listenerü  s    zBaseTCPBackend.get_listenerc             C   s   t |ƒd S )Nr   )r   )r\   rÈ   r&   r&   r'   Úget_address_host  s    zBaseTCPBackend.get_address_hostc             C   s   t |ƒS )N)r   )r\   rÈ   r&   r&   r'   Úget_address_host_port  s    z$BaseTCPBackend.get_address_host_portc             C   s   t |ƒ\}}tt|ƒ|ƒS )N)r   r   r   )r\   rÈ   rÀ   r¨   r&   r&   r'   Úresolve_address  s    zBaseTCPBackend.resolve_addressc             C   s8   t |ƒ\}}t|ƒ}d|kr&t|ƒ}nt|ƒ}t|d ƒS )Nú:)r   r   r   r   r   )r\   rÈ   rÀ   r¨   Z
local_hostr&   r&   r'   Úget_local_address_for  s    
z$BaseTCPBackend.get_local_address_forN)	rM   rˆ   r‰   rÆ   rÉ   rÊ   rË   rÌ   rÎ   r&   r&   r&   r'   rÄ   õ  s   rÄ   c               @   s   e Zd ZeZeZdS )Ú
TCPBackendN)rM   rˆ   r‰   rª   rÅ   rÁ   rÇ   r&   r&   r&   r'   rÏ     s   rÏ   c               @   s   e Zd ZeZeZdS )Ú
TLSBackendN)rM   rˆ   r‰   r«   rÅ   rÂ   rÇ   r&   r&   r&   r'   rÐ     s   rÐ   ZtcpZtls)IZ
__future__r   r   r   r¶   Zloggingr5   rg   r8   rI   r%   r1   r‹   r   r   Ztornado.iostreamr   r   Ztornado.tcpclientr	   Ztornado.tcpserverr
   Zcompatibilityr   r   Zthreadpoolexecutorr   Zutilsr   r   r   r   r   r   r   Úregistryr   r   Z
addressingr   r   Zcorer   r   r   r   r   r    r!   r"   r#   Z	getLoggerrM   r;   r(   r¤   rF   rG   rP   rQ   r   r˜   Úobjectr™   rŸ   rª   r«   r­   rÁ   rÂ   rÄ   rÏ   rÐ   r&   r&   r&   r'   Ú<module>   sX   
$
9 	
&	
W 
