B
    4\j                 @   sl  d dl mZ d dlmZ d dlZd dlZd dlmZ d dlm	Z	 d dl
mZ d dlmZmZmZ d dlmZ d d	lmZ d d
lmZmZmZ d dlmZ G dd deZedd Ze dd Ze dd Ze dd Ze dd Z e dd Z!e dd Z"e dd Z#eeddd d! Z$e	j%d"d# Z&e d$d% Z'ee d&d' Z(e d(d) Z)dS )*    )contextmanager)	timedeltaN)assoc)gen)BatchedSend)listenconnectCommClosedError)time)All)gen_testslowcaptured_logger)to_serializec               @   s&   e Zd ZdZejdd Zdd ZdS )
EchoServerr   c          
   c   sV   xPy(|  V }|  jd7  _||V  W q tk
rL } zd S d }~X Y qX qW d S )N   )readcountwriter	   )selfcommmsge r   =lib/python3.7/site-packages/distributed/tests/test_batched.pyhandle_comm   s    
zEchoServer.handle_commc             C   s(   t d| j}|  |j| _|j| _d S )N )r   r   startZcontact_addressaddressstop)r   Zlistenerr   r   r   r      s    zEchoServer.listenN)__name__
__module____qualname__r   r   	coroutiner   r   r   r   r   r   r      s   
r   c              c   s(   t  } |   z
| V  W d |   X d S )N)r   r   r   )Zserverr   r   r   echo_server%   s
    
r$   c           	   c   s   t  } t| jV }tdd}tt|jt|ks8ttt|jt|ksRt|	| t
dV  |d |d |d t
dV  |d |d | V }|dkst| V }|dkst|jd	kstW d Q R X d S )
N
   )intervalg{Gz?helloworldHELLO)r'   r'   r(   )r)   r)   r   )r$   r   r   r   strlenbufferAssertionErrorreprr   r   sleepsendr   Z
byte_count)r   r   bresultr   r   r   test_BatchedSend0   s$    








r3   c           	   c   s`   t  P} t| jV }tdd}|d |d || | V }|dksRtW d Q R X d S )Nr%   )r&   r'   r(   )r'   r(   )r$   r   r   r   r0   r   r   r-   )r   r   r1   r2   r   r   r   test_send_before_startK   s    




r4   c           	   c   sz   t  j} t| jV }tdd}|| |d |d | V }t|dk r`|| V 7 }|dksltW d Q R X d S )Nr%   )r&   r'   r(      )r'   r(   )	r$   r   r   r   r   r0   r   r+   r-   )r   r   r1   r2   r   r   r   test_send_after_stream_startZ   s    




r6   c           
   c   s   t  } t| jV }tdd}|| t| j}|d | V  |j	rPt
t }x0| j|d krtdV  t |d k sXt
qXW tt |d W d Q R X W d Q R X d S )Nr%   )r&   r'   r   g{Gz?   Z123)r$   r   r   r   r   intr   r0   closer,   r-   r
   r   r/   pytestraisesr	   )r   r   r1   Zcntr   r   r   r   test_send_before_closej   s    





r<   c           	   c   sr   t  b} t| jV }tdd}|| |d |  | V  dt|ksTtdt	|ksdtW d Q R X d S )Nr%   )r&   {   closed)
r$   r   r   r   r   r0   r9   r.   r-   r*   )r   r   r1   r   r   r   test_close_closed   s    



r?   c              c   s   t dd} |  V  d S )Nr%   )r&   )r   r9   )r1   r   r   r   test_close_not_started   s    
r@   c           	   c   sJ   t  :} t| jV }tdd}|| | V  | V  W d Q R X d S )Nr%   )r&   )r$   r   r   r   r   r9   )r   r   r1   r   r   r   test_close_twice   s    


rA   2   )Ztimeoutc           	   #   s~   t  n} t| jV g  tjfdd}tj fdd}t| | gV   ttdddksht	  W d Q R X d S )Nc              3   s^   t dd} |   xDtdddD ]4}| | | |d  tdtdd V  q"W d S )	N   )r&   r   i'  r5   r   gh㈵>r%   )r   r   ranger0   r   r/   randomrandint)r1   i)r   r   r   r0      s    


ztest_stress.<locals>.sendc              3   s:   x4t tdd V }  |  | d dkrP qW d S )Nr   )secondsi'  )r   with_timeoutr   r   extend)r2   )Lr   r   r   recv   s
    
ztest_stress.<locals>.recvr   i'  r   )
r$   r   r   r   r#   r   listrD   r-   r9   )r   r0   rM   r   )rL   r   r   test_stress   s    	rO   c          	   c   s`  t d}ddlm} t|jjdd|fddj}t	 }t
|jV }tdd}|| d	||i}x<t| D ]0}	|t|d
|	 |j dkrvtdV  qvW g }
d}xDt|
| k rttdd| V }|d7 }|
dd |D  qW ||j  kr|jksn t|j| ks*t|
tt| ks@t|  | V  W d Q R X d S )NZnumpyr   )r      )sizeZu1g{Gz?)r&   xrG   g      ?gMbP?r7   )rH   r   c             s   s   | ]}|d  V  qdS )rG   Nr   ).0rr   r   r   	<genexpr>   s    z"run_traffic_jam.<locals>.<genexpr>)r:   Zimportorskipdistributed.protocolr   bytesrE   rF   Zastypedatar$   r   r   r   r   rD   r0   r   r   r/   r+   rJ   r   r   rK   Zbatch_countr   r-   Zmessage_countrN   r9   )ZnsendsnbytesZnpr   rX   r   r   r1   r   rG   Zresultsr   rL   r   r   r   run_traffic_jam   s.    
 


"rZ   c               c   s   t ddV  d S )NrB   i )rZ   r   r   r   r   test_sending_traffic_jam   s    r[   c               c   s   t ddV  d S )Ni  i` )rZ   r   r   r   r   test_large_traffic_jam   s    r\   c           
   c   s$  t  } t| jV }tddgd}|| |dtdi |dtdi tdV  |dtdd	 i t	d
}tdV  W d Q R X |
 }d|kstd|kstd|kst| V }t|ddiddigkstttj ttdd| V }W d Q R X W d Q R X d S )NZ10msZmsgpack)r&   ZserializersrR   r=   r'   g?c             S   s   | d S )Nr   r   )rR   r   r   r   <lambda>   s    z"test_serializers.<locals>.<lambda>zdistributed.protocolZ	serializetypeZfunctiond   )Zmilliseconds)r$   r   r   r   r   r0   r   r   r/   r   getvaluer-   r   rN   r:   r;   TimeoutErrorrJ   r   )r   r   r1   Zsiovaluer   r   r   r   test_serializers   s$    



rc   )*
contextlibr   Zdatetimer   rE   r:   Ztoolzr   Ztornador   Zdistributed.batchedr   Zdistributed.corer   r   r	   Zdistributed.metricsr
   Zdistributed.utilsr   Zdistributed.utils_testr   r   r   rV   r   objectr   r$   r3   r4   r6   r<   r?   r@   rA   rO   r#   rZ   r[   r\   rc   r   r   r   r   <module>   s6   %