B
    M^\                 @   s<  d dl mZmZmZ d dlZd dlmZ d dlZd dlm	Z	 d dl
Z
d dlZd dlmZ d dlZd dlZd dlZd dlZd dlZd dlZd dlZd dlZd dlZd dlZd dlZd dlZd dlZd dlZd dlmZ d dlZd dlZyd dlZW n e k
r
   dZY nX d dl!Z!d dl"Z"d dl#Z#d dl$m%Z%m&Z&m'Z' d dl(m)Z)m*Z* d d	l+m,Z, d d
l-m.Z. ddl/m0Z0m1Z1m2Z2 ddl3m4Z4m5Z5m6Z6m7Z7 ddl8m9Z9 ddl:m;Z; ddl<m=Z=m>Z>m?Z? ddl@mZ ddlAmBZB ddlCmDZD ddlEmFZF ddlGmHZHmIZImJZJmKZKmLZLmMZMmNZNmOZOmPZPmQZQ ddlRmSZSmTZTmUZU yd dlVZ#W n e k
rN   Y nX eWeXZYdd ejZj[j\] D Z^e9dd _  e!j`dddd Zae!j`dddd  Zbe!j`ddd!d" Zce)jdd#d$ Zee!j`d%d& Zfe!j`d'd( Zge!j`d)d* Zhed+d, Zied-d. Zjeke#j:j:Zld/d0 Zmd1d2 Znd3d4 Zod5d6 Zpd7d8 Zqd9d: Zrd;d< Zsd=d> Ztd?d@ ZudAdB ZvdCdD ZwddFdGZxddHdIZyddJdKZzddLdMZ{ddNdOZ|ddPdQZ}dRdS Z~efdTdUZeeZe ZG dVdW dWeZdXdY ZdZd[ Ze)jddd\d]Zdd_d`Zejdakr0edb es4tndZi Ze)jddcdd Zdedf Zdgdh Zdidj ZeddkdlZe!j`dmdn Ze!j`dodp Ze!j`dqdr Ze!j`dsdt Ze!j`dudv Z/e!j`dwdx ZeddydzZe!j`d{d| Ze!j`d}d~ Ze!j`dd ZEeddi di fddZe)jddddZe)jddddZdd ZdddZddlmZ ddlRmSZS e)jddeSi i fddZe)jddd ZddgdddeSdi i i di d^fddZefddZdd Zedd ZdddZdddZe)jddddZe&dd Ze rdd Zne!jdZe)jddddZe)jdddefddZe)jddddZe)jddddZe)jddddZe)jddddZe)jddddZeejdfddZedd Zedd ZeddÄ Zeddń ZejejejeádơZddȄ Zddʄ Zdd̄ Zdd΄ ZddЄ ZdddՄZdddׄZddل Zddۄ ZdS )    )print_functiondivisionabsolute_importN)contextmanager)	timedelta)glob)sleep)mergememoizeassoc)genqueues)TimeoutError)IOLoop   )default_client_global_clientsClient)PY3EmptyWINDOWSPY2)offload)initialize_logging)connectrpcCommClosedError)time)_cleanup_dangling)enable_proctitle_on_children)Security)
ignoring
log_errors
mp_contextget_ipget_ipv6DequeHandlerreset_logger_lockssynciscoroutinefunctionthread_state)WorkerTOTAL_MEMORY_global_workersc             C   s$   i | ]\}}t |tjr|j|qS  )
isinstanceloggingZLoggerlevel).0nameloggerr.   r.   5lib/python3.7/site-packages/distributed/utils_test.py
<dictcomp>A   s    r6   c               C   s   d S )Nr.   r.   r.   r.   r5   <lambda>F   s    r7   Zsession)Zscopec             C   s   |  dd}|d |S )Ndatazfile.pyzprint('hello world!'))mktempjoinwrite)tmpdir_factory
local_filer.   r.   r5   valid_python_scriptI   s    
r>   c             C   s(   |  dd}d}|d| |S )Nr8   zdistributed_script.py)zfrom distributed import Clientze = Client('127.0.0.1:8989')zprint(e)
)r9   r:   r;   )r<   r=   linesr.   r.   r5   client_contract_scriptP   s    rA   c             C   s   |  dd}|d |S )Nr8   zfile.pyza+1)r9   r:   r;   )r<   r=   r.   r.   r5   invalid_python_scriptY   s    
rB   c              C   s&   x t D ]} |  } | jddd qW d S )NF)reportexecutor_wait)r-   _close)wr.   r.   r5   cleanup_global_workers`   s    
rG   c              #   sN  t d d = t  t } | jt      fdd}|| _| V  yt| t	dd | 
| j W nH tk
r } ztdt|s W d d }~X Y n tjk
r   Y n
X    W d Q R X t d d = t }x&ttrtd t |d k stqW t  trBx8t jdd	D ]$}ttj |  W d Q R X qW t  d S )
Nc                  s"       z
  W d    X d S )N)clearsetr.   )
is_stopped
orig_startr.   r5   startq   s    
zloop.<locals>.startg      ?)Zcallback_timeoutzIOLoop is clos(ed|ing)g?
   T)	recursive)r-   r   rH   pristine_looprL   	threadingEventrI   r(   rG   add_callbackstopRuntimeErrorrematchstrr   r   waitr   r   AssertionErrorr   r   psutilProcessZchildrenr!   ZNoSuchProcess	terminate)looprL   eZchildr.   )rJ   rK   r5   r]   g   s:    


r]   c           	   c   sn   t  ^} tj| jdd}d|_|  t }| |j |  | V  | | j	 |j
dd W d Q R X d S )Nztest IOLoop)targetr3   T   )timeout)rO   rP   ZThreadrL   daemonrQ   rR   rI   rX   rS   r:   )r]   threadZloop_startedr.   r.   r5   loop_in_thread   s    rd   c              c   s(   dd l } | j }|V  |jdd d S )Nr   )Zlinger)zmqZContextinstanceZdestroy)re   ctxr.   r.   r5   zmq_ctx   s    
rh   c              c   sz   t   t   t  } |   t  | ks.tz
| V  W d y| jdd W n ttfk
rb   Y nX t   t   X d S )NT)all_fds)	r   Zclear_instanceZclear_currentZmake_currentZcurrentrY   closeKeyError
ValueError)r]   r.   r.   r5   rO      s    
rO   c           
   #   s   dd l } ddlm} |   i  _d  _ fdd}| d|$ | d|  V  W d Q R X W d Q R X x|j D ]}|	  qvW |j
  d S )Nr   )remote_magicc                  s    S )Nr.   r.   )ipr.   r5   r$      s    zmock_ipython.<locals>.get_ipzIPython.get_ipythonz&distributed._ipython_utils.get_ipython)mockZdistributed._ipython_utilsrm   ZMockZuser_nsZkernelZpatchZ_clientsvaluesZstop_channelsrH   )ro   rm   r$   Zkcr.   )rn   r5   mock_ipython   s    rq   c               C   s$   t jj  t jjtt d S )N)daskconfigrH   updatecopydeepcopyoriginal_configr.   r.   r.   r5   reset_config   s    rx   c                s"   t s S t  fdd}|S )z
    A decorator to disable debug facilities during timing-sensitive tests.
    Warning: this doesn't affect already created IOLoops.
    c           	      s@   t jd}|d k	rt jd= z
 | |S |d k	r:|t jd< X d S )NPYTHONASYNCIODEBUG)osenvironget)argskwargsZold_asyncio_debug)funcr.   r5   wrapped   s    
znodebug.<locals>.wrapped)r   	functoolswraps)r   r   r.   )r   r5   nodebug   s    r   c             C   s$   t jd| _| jdk	r t jd= dS )za
    A setup_module() that you can install in a test module to disable
    debug facilities.
    ry   N)rz   r{   r|   _old_asyncio_debug)moduler.   r.   r5   nodebug_setup_module   s    
r   c             C   s   | j dk	r| j tjd< dS )ze
    A teardown_module() that you can install in a test module to reenable
    debug facilities.
    Nry   )r   rz   r{   )r   r.   r.   r5   nodebug_teardown_module   s    
r   c             C   s   | d S )Nr   r.   )xr.   r.   r5   inc  s    r   c             C   s   | d S )Nr   r.   )r   r.   r.   r5   dec  s    r   c             C   s   | | S )Nr.   )r   yr.   r.   r5   mul  s    r   c             C   s   | | S )Nr.   )r   r   r.   r.   r5   div  s    r   c             C   s   | dkrt | d S dS d S )Nr   r   T)deep)nr.   r.   r5   r     s    r   c             C   s   t dd S )Nzhello!)rT   )r   r.   r.   r5   throws  s    r   c             C   s   | d S )N   r.   )r   r.   r.   r5   double#  s    r   {Gz?c             C   s   t | | d S )Nr   )r   )r   delayr.   r.   r5   slowinc'  s    r   c             C   s   t | | d S )Nr   )r   )r   r   r.   r.   r5   slowdec,  s    r   c             C   s   t | d|  S )Nr   )r   )r   r   r.   r.   r5   
slowdouble1  s    r   c             C   s"   ddl m } t| |  | d S )Nr   )randomr   )r   r   )r   Zscaler   r.   r.   r5   	randominc6  s    r   c             C   s   t | | | S )N)r   )r   r   r   r.   r.   r5   slowadd<  s    r   c             C   s   t | t| S )N)r   sum)seqr   r.   r.   r5   slowsumA  s    r   c              O   s0   | dd}t| t| dkr(| d S | S d S )Nr   g{Gz?r   r   )r|   r   len)r}   r~   r   r.   r.   r5   slowidentityF  s
    r   c             C   s   | |  }x| |krqW dS )z*
    Burn CPU for *duration* seconds.
    Nr.   )ZdurationZtimerdeadliner.   r.   r5   run_forO  s    
r   c               @   s   e Zd Zdd Zdd ZdS )_ModuleSlotc             C   s   || _ || _d S )N)modnameslotname)selfr   r   r.   r.   r5   __init__^  s    z_ModuleSlot.__init__c             C   s   t tj| j | jS )N)getattrsysmodulesr   r   )r   r.   r.   r5   r|   b  s    z_ModuleSlot.getN)__name__
__module____qualname__r   r|   r.   r.   r.   r5   r   ]  s   r   c                s&   t tdtt fdd}|S )zi
    Return a function that returns a result (or raises an exception)
    from *items* at each call.
    _varying_dictc                 sN     } |  }|t kr"tn( | }|d | < t|trF|n|S d S )Nr   )r|   r   
IndexErrorr/   	Exception)Zdctir   )itemskeyslotr.   r5   r   r  s    
zvarying.<locals>.func)r   r   next_varying_key_gen)r   r   r.   )r   r   r   r5   varyingf  s    	
r   c             C   s   dd }|t t| fS )zi
    Like *varying*, but return the full specification for a map() call
    on multiple items lists.
    c             _   s
   | ||S )Nr.   )r   r}   r~   r.   r.   r5   apply  s    zmap_varying.<locals>.apply)mapr   )Z
itemslistsr   r.   r.   r5   map_varying  s    r   c             c   s   t |V  t | d d S )Nr   )r   r   Return)r   r   r.   r.   r5   geninc  s    r   Tc             C   s0   |rt | } t| dd} t }t| || d S )Nz	<dynamic>exec)textwrapdedentcompileglobalsr   )coder   nsr.   r.   r5   compile_snippet  s
    
r   )   r`   zp
        async def asyncinc(x, delay=0.02):
            await gen.sleep(delay)
            return x + 1
        c             #   sr   yt   W n> tk
rJ   t  t  < tj fdd}|  Y nX  V }|dkrdtn
t|dS )zR
    Read one message at a time from a comm that reads lists of
    messages.
    c              3   sX   xBy   V } W n tk
r&   P Y nX x| D ]}| q.W qW d  t = d S )N)readr   Z
put_nowait_readone_queues)Zmessagesmsg)commqr.   r5   background_read  s    

z readone.<locals>.background_readN)	r   rk   r   Queuer   	coroutiner|   r   r   )r   r   r   r.   )r   r   r5   readone  s    
r   c          
   K   sx   ddl m} t \}|f ddi|}|d}xt|D ]}| |j q:W z|  W d |jdd X W d Q R X d S )Nr   )	SchedulervalidateTz	127.0.0.1)ri   )distributedr   rO   rL   rangeputaddressrj   )r   Znputsr~   r   r]   	schedulerdoner   r.   r.   r5   run_scheduler  s    
r   c                s   ddl m} t  t  t p}| }||fddi| | fdd |  j z t	j
 fdd}|| W d |jdd	 X W d Q R X W d Q R X d S )
Nr   )r+   r   Tc                  s
     dS )Nr   )_startr.   )workerr.   r5   r7     s    zrun_worker.<locals>.<lambda>c               3   s    j  V  d S )N)Z_closedrX   r.   )r   r.   r5   wait_until_closed  s    z%run_worker.<locals>.wait_until_closed)ri   )r   r+   r'   r"   rO   r|   run_syncr   r   r   r   rj   )r   scheduler_qr~   r+   r]   scheduler_addrr   r.   )r   r5   
run_worker  s    r   c                s   ddl m} t z t h}| }||fddi| | fdd |  j z|  W d | j	 |j
dd X W d Q R X W d Q R X d S )Nr   )Nannyr   Tc                  s
     dS )Nr   )r   r.   )r   r.   r5   r7     s    zrun_nanny.<locals>.<lambda>)ri   )r   r   r"   rO   r|   r   r   r   rL   rE   rj   )r   r   r~   r   r]   r   r.   )r   r5   	run_nanny  s    r   c             #   sZ   t tj  r$ts$t  t tj d V   fddtj fdd}| | d S )Nc                  s   t dttj    d S )Nz!some RPCs left active by test: %s)pytestfailrI   r   activer.   )active_beforer.   r5   r     s    zcheck_active_rpc.<locals>.failc               3   s   t  fdddV  d S )Nc                  s   t ttj  dkS )Nr   )r   rI   r   r   r.   )r   r.   r5   r7     s    z0check_active_rpc.<locals>.wait.<locals>.<lambda>)ra   	fail_func)async_wait_forr.   )r   active_rpc_timeoutr   r.   r5   rX     s    zcheck_active_rpc.<locals>.wait)	rI   r   r   r   gcZcollectr   r   r   )r]   r   rX   r.   )r   r   r   r5   check_active_rpc  s    

r   c          	   c   s$   t  \}}||fV  W d Q R X d S )N)cluster)r]   r   workersr.   r.   r5   cluster_fixture  s    r   c             C   s   | \}}|S )Nr.   )r   r   r   r.   r.   r5   s  s    r   c             C   s   | \}}|d S )Nr   r.   )r   r   r   r.   r.   r5   a%  s    r   c             C   s   | \}}|d S )Nr   r.   )r   r   r   r.   r.   r5   b+  s    r   c          	   c   s.   |\}}t |d | d}|V  W d Q R X d S )Nr   )r]   )r   )r]   r   r   r   clientr.   r.   r5   r   1  s    r   c          	   c   s.   |\}}t |d | d}|V  W d Q R X d S )Nr   )r]   )r   )r]   r   r   r   r   r.   r.   r5   client_secondary8  s    r   c          	   k   s\   |pt  }t| pi d|} t|p"i d|}tf | |d|\}}||fV  W d Q R X d S )Nsecurity)worker_kwargsscheduler_kwargs)tls_only_securityr   r   )r   r   r   r~   r   r   r.   r.   r5   tls_cluster_context?  s    
r   c          	   c   s(   t |d\}}||fV  W d Q R X d S )N)r   )r   )r]   r   r   r   r.   r.   r5   tls_clusterL  s    r   c          	   c   s0   | \}}t |d ||d}|V  W d Q R X d S )Nr   )r   r]   )r   )r   r]   r   r   r   r   r.   r.   r5   
tls_clientR  s    r   c               C   s   t  S )N)r   r.   r.   r.   r5   r   Y  s    r   r   Fc             #   s  t  }t  x$t D ]\}}t|| qW t  t	 <}t
||$ |r\t}	nt}	t }
tjt|
| d f|d}|| d|_|  g xft| D ]Z}t }dt  }td|td|}tj|	||
f|d}|| |||d qW xD ]}|d   q
W y(x"D ]}|d jd	d
|d< q(W W n" tk
rj   tjdY nX |
 t }zy|d }d| di W n t!k
r   i  Y nX t"f B}x:|#|j$}t%|| krP t | d	krtdqW W d Q R X didd D fV  W d t&'d |# fdd |# fdd |(  |
)  |
j*)  |
j+)  xBD ]:}|d (  |d )  |d j*)  |d j+)  q~W |,d ~x$dd D D ]}|j,dd
 qW t-t. ~~~W d Q R X d d = x0t/dD ]$}t-t0 t12| W d Q R X qW X y
t3 }W n t4k
rf   Y n
X |)  W d Q R X W d Q R X t }x0t5|rt6d t |d k st7dqW d S )Nr   )r_   r}   r~   Tz_test_worker-%s)ncoresZ	local_dirmemory_limit)procqueuedirr   r   r`   )ra   r   zWorker failed to start in testr   connection_argsr   zTimeout on cluster creationc             S   s$   g | ]}|d  t |d dqS )r   r   )r   r   )weakrefref)r2   rF   r.   r.   r5   
<listcomp>  s   zcluster.<locals>.<listcomp>zClosing out test clusterc                  s   t dd D d dS )Nc             S   s   g | ]}|d  qS )r   r.   )r2   rF   r.   r.   r5   r     s    z-cluster.<locals>.<lambda>.<locals>.<listcomp>g      ?)ra   
rpc_kwargs)disconnect_allr.   )r  r   r.   r5   r7     s   zcluster.<locals>.<lambda>c                  s   t d dS )Ng      ?)ra   r  )
disconnectr.   )r  saddrr.   r5   r7     s    r   c             S   s   g | ]}|d  qS )r   r.   )r2   rF   r.   r.   r5   r     s    z_test_worker-*g{Gz?z%Workers still around after one second)8r   WeakSetrx   logging_levelsr   r0   	getLoggersetLevelr   rO   r   r   r   r#   r   r[   r   addrb   rL   r   uuidZuuid4r	   r,   appendr|   r   r   Zxfailr   r   Zget_connection_argsrk   r   r   r   r   r4   debugr\   rj   Z_readerZ_writerr:   r!   UnboundLocalErrorr   OSErrorshutilZrmtreer   rl   listr   rY   )ZnworkersZnannyr   r   r   wsr3   r1   r]   Z_run_workerr   r   r   r   fnr~   r   r   rL   r   r   r   rF   r   r.   )r  r  r   r5   r   ^  s    















r   r   c          	   #   sJ   pi t j fdd}tt t t|d| V  W d Q R X d S )Nc           
   3   s@   t tt, t f} | jddV  W d Q R X W d Q R X d S )NT)rj   )r!   EnvironmentErrorr   r   r\   )rF   )addrr  r.   r5   do_disconnect  s    z!disconnect.<locals>.do_disconnect)seconds)r   r   r!   r   with_timeoutr   )r  ra   r  r  r.   )r  r  r5   r    s    
r  c             #   s    fdd| D V  d S )Nc                s   g | ]}t | qS r.   )r  )r2   r  )r  ra   r.   r5   r     s    z"disconnect_all.<locals>.<listcomp>r.   )Z	addressesra   r  r.   )r  ra   r5   r    s    r  c             C   s>   y t jdst jd| } W n tk
r4   Y nX t| S )Nz	--runslowzneed --runslow option to run)r   rs   Z	getoptionmarkskipAttributeErrorr   )r   r.   r.   r5   slow  s    r  rM   c                s    fdd}|S )zn Coroutine test

    @gen_test(timeout=5)
    def test_foo():
        yield ...  # use tornado coroutines
    c                s    fdd}|S )Nc           	      sL   t  <} t r }n
t }z| j|d W d |   X W d Q R X d S )N)ra   )rO   r)   r   r   r   rS   )r]   Zcor)r   ra   r.   r5   	test_func  s    
z&gen_test.<locals>._.<locals>.test_funcr.   )r   r  )ra   )r   r5   _  s    
zgen_test.<locals>._r.   )ra   r  r.   )ra   r5   gen_test  s    r  )r   )r+   c             #   s   t f dd||} fddt| D }x|D ]}	|d j|	_qDW dd t| |D V  t }
xntjt| k stdd j	
 D rtd	V  t |
 d
krvdd |D V  jddV  tdqvW t|fd S )NT)r]   r   r   c          
      sL   g | ]D\}} j f|d  |ddt|dkr@t|d nqS )r   T)r   r3   r   r]   r   r   )r   r   r	   )r2   r   ncore)r+   r]   r   r   r   r.   r5   r     s   z!start_cluster.<locals>.<listcomp>r   c             S   s   g | ]\}}| |d  qS )r   )r   )r2   r  rF   r.   r.   r5   r     s    c             s   s   | ]}|j d kV  qd S )N)r   )r2   r   r.   r.   r5   	<genexpr>  s    z start_cluster.<locals>.<genexpr>g{Gz?r`   c             S   s   g | ]}|j d dqS )r   )ra   )rE   )r2   rF   r.   r.   r5   r   "  s    )fastzCluster creation timeout)r   rL   	enumerater   zipr   r   r   anyZstream_commsrp   r   r   rj   r   r   )r   r   r]   r   r+   r   r   r   r   rF   rL   r.   )r+   r]   r   r   r   r5   start_cluster  s"    

r%  c             #   sB   t d tjdd   fdd|D V  |  V  |   d S )NzClosing out test clusterc          	   s   s*   t ttt | jddV  W d Q R X d S )NF)rC   )r!   r   r   r  rE   )rF   r.   r.   r5   
end_worker,  s    zend_cluster.<locals>.end_workerc                s   g | ]} |qS r.   r.   )r2   rF   )r&  r.   r5   r   1  s    zend_cluster.<locals>.<listcomp>)r4   r  r   r   rj   rS   )r   r   r.   )r&  r5   end_cluster(  s
    

r'  )z	127.0.0.1r   )z	127.0.0.1r   z	127.0.0.1c                sD   ddl m  ttdd 	
fdd}|S )Nr   )r   r`   )r   Zdeath_timeoutc                s<   t  st   	
fdd}|S )Nc                 s  t d d = t  ttj t  tjddi x$t	
 D ]\} }t| | q<W d }g t tL tj	
fdd}j|rd nd}W d Q R X x@D ]8}t|dd ry|j  W n tk
r   Y nX |`qW t  x6t D ].}| }|jddd	 |jd
kr|  qW t d d = W d Q R X trtsrt }x fddtj
 D }|sP ntd t |d krtddlm } |d }tj| }	|!t"# | }
dstt$|	|
fqtW t%  t&t'
 t(`)W d Q R X |S )Nz!distributed.comm.timeouts.connectZ5sc              3   s  t jr d} xztdD ]n}y t
	dV \} }W n0 tk
rr } ztjddd W d d }~X Y qX |d d < | g }P qW | dkrtdrʈ | jf
ddV }|g| }z6| }rt	
td	|}|V }| jr|   W d r|j| jd
kdV  t| V  t	
tdd	t V  X yt V }W n tk
rb   Y nX |jddV  t	|W d Q R X d S )NFr`   )r   r+   r   r   z%Failed to start gen_cluster, retryingT)exc_infozCould not start cluster)r]   r   Zasynchronous)r  closed)r!  r   )rr   rs   rI   r   r%  r   r4   errorr   r   r  r   r   Zvalidate_staterE   statusr'  rG   r   rl   r   )r   r   r  r^   r}   cZfutureresult)r   r+   r   client_kwargsrs   r   r]   r   r   r   r   ra   r   r   r.   r5   coro_  sL     

z7gen_cluster.<locals>._.<locals>.test_func.<locals>.coror   )ra   r8   F)rC   rD   Zrunningc                s:   g | ]2\}}| krd |j krd|j krd|j kr|qS )ZThreadedzwatch messagezTCP-Executor)r3   )r2   tv)active_threads_startr.   r5   r     s
    

z=gen_cluster.<locals>._.<locals>.test_func.<locals>.<listcomp>g{Gz?r`   r   )profile)*r-   r   rH   rI   rP   Z_activerx   rr   rs   r  r   r0   r  r  rO   r   r   r   r   r   r8   r  r&   Zclear_all_instancesrE   r+  rj   r   r   r   r   r   r3  Z
call_stackr   _current_framesrY   r   r!   r  r*   Zon_event_loop_thread)r3   r1   r-  r/  rF   rL   Zbadr3  tidrc   Zcall_stacks)r   r+   r   check_new_threadsr   r.  rs   r   r   r   r   r   ra   r   )r2  r]   r   r5   r  M  sX    

,.$



z)gen_cluster.<locals>._.<locals>.test_func)r)   r   r   )r   r  )r   r+   r   r6  r   r.  rs   r   r   r   r   ra   r   )r   r5   r  I  s    
&jzgen_cluster.<locals>._)r   r   r	   r,   )r   r   ra   r   r+   r   r   r   r.  r   rs   r6  r  r.   )r   r+   r   r6  r   r.  rs   r   r   r   r   ra   r   r5   gen_cluster6  s
    
$or7  c             C   s$   y
|   dS  |k
r   dS X d S )NFTr.   )r   excr.   r.   r5   raises  s
    r9  c             C   s   |   d krtjdr&| tj n| tj zLtjd dkrN| 	d n.t
 }x&|   d krzt
 |d k rztd qVW W d tt |   W d Q R X X d S )Nwinr   r   rM   g{Gz?)Zpollr   platform
startswithZsend_signalsignalZCTRL_BREAK_EVENTSIGINTversion_inforX   r   r   r!   r  kill)r   rL   r.   r.   r5   terminate_process  s    
rA  c             k   s  t j|d< t j|d< tjdr*t j|d< d}t| } tjdr^tj	tj
d| d | d< ntj	tj
d| d | d< t j| f|}z*y
|V  W n tk
r   d	} Y nX W d zt| W d | \}}|rtd
| d d   t|  td t|  X X d S )Nstdoutstderrr:  ZcreationflagsFZScriptsr   binTz+

Print from stderr
  %s
=================
z&

Print from stdout
=================
)
subprocessPIPEr   r;  r<  ZCREATE_NEW_PROCESS_GROUPr  rz   pathr:   prefixPopenr   rA  Zcommunicateprintdecode)r}   r~   Zdump_stdoutr   outerrr.   r.   r5   popen  s.    



rN  r`   c             C   st   t | tstt | }xV|t  }|dk r:td| f ytj| |d}W n tk
r`   Y qX |  P qW d S )Nr   zFailed to connect to %s)ra   )	r/   tuplerY   r   rT   socketcreate_connectionr  rj   )r   ra   r   Zsockr.   r.   r5   wait_for_port  s    

rR  MbP?c             C   sJ   t  | }x:|  sDt| t  |kr|d k	r2|  td|f  qW d S )Nz&condition not reached until %s seconds)r   r   r   r   )	predicatera   r   periodr   r.   r.   r5   wait_for
  s    

rV  c             c   sN   t  | }x>|  sHt|V  t  |kr|d k	r6|  td|f  qW d S )Nz&condition not reached until %s seconds)r   r   r   r   r   )rT  ra   r   rU  r   r.   r.   r5   r     s    

r   c              C   s   d } }z\y>t  t jt j} | d | d t |  dd }W n tk
r\   dS X dS W d|dk	rv|  | dk	r|   X dS )z
    Return whether IPv6 is locally functional.  This doesn't guarantee IPv6
    is properly configured outside of localhost.
    N)z::r   r`   r   FT)	rP  ZAF_INET6ZSOCK_STREAMZbindZlistenrQ  Zgetsocknamer  rj   )ZservZclir.   r.   r5   has_ipv6  s    

rW  c             C   s   | S )Nr.   )r  r.   r.   r5   requires_ipv67  s    rX  zipv6 requiredc             c   s(   |dkrd}t | ||dV }|  dS )zh
    Check that it is possible to connect to the distributed *addr*
    within the given *timeout*.
    Ng      ?)ra   r   )r   abort)r  ra   r   r   r.   r.   r5   assert_can_connect>  s
    
rZ  c          	   c   s>   |dkrd}t | t| ||dV }|  W dQ R X dS )zj
    Check that it is impossible to connect to the distributed *addr*
    within the given *timeout*.
    Ng      ?)ra   r   )r   r9  r   rY  )r  ra   r   Zexception_classr   r.   r.   r5   assert_cannot_connectK  s    
r[  tcpc             c   s|   ||f}t d|| f f| t d|t | f f| g}t rr|t d|| f f| t d|t | f f| g7 }|V  dS )zT
    Check that the local *port* is reachable from all IPv4 and IPv6 addresses.
    z%s://127.0.0.1:%dz
%s://%s:%dz%s://[::1]:%dz%s://[%s]:%dN)rZ  r$   rW  r%   )portra   r   protocolr}   futuresr.   r.   r5   &assert_can_connect_from_everywhere_4_6Y  s    r`  c             c   s|   ||f}t d|| f f| t d|t | f f| g}t rr|td|| f f| td|t | f f| g7 }|V  dS )zK
    Check that the local *port* is reachable from all IPv4 addresses.
    z%s://127.0.0.1:%dz
%s://%s:%dz%s://[::1]:%dz%s://[%s]:%dN)rZ  r$   rW  r[  r%   )r]  ra   r   r^  r}   r_  r.   r.   r5   $assert_can_connect_from_everywhere_4k  s    ra  c             c   s   ||f}t d|  f| g}t dkrD|tdt | f f| g7 }t rx|td|  f| tdt | f f| g7 }|V  dS )zR
    Check that the local *port* is only reachable from local IPv4 addresses.
    ztcp://127.0.0.1:%dz	127.0.0.1ztcp://%s:%dztcp://[::1]:%dztcp://[%s]:%dN)rZ  r$   r[  rW  r%   )r]  ra   r   r}   r_  r.   r.   r5   assert_can_connect_locally_4}  s    
rb  c             c   sl   t  s
t||f}td|  f| tdt | f f| td|  f| tdt | f f| g}|V  dS )zK
    Check that the local *port* is reachable from all IPv6 addresses.
    ztcp://127.0.0.1:%dztcp://%s:%dztcp://[::1]:%dztcp://[%s]:%dN)rW  rY   r[  r$   rZ  r%   )r]  ra   r   r}   r_  r.   r.   r5   $assert_can_connect_from_everywhere_6  s    
rc  c             c   s~   t  s
t||f}td|  f| tdt | f f| td|  f| g}t dkrt|tdt | f f| g7 }|V  dS )zR
    Check that the local *port* is only reachable from local IPv6 addresses.
    ztcp://127.0.0.1:%dztcp://%s:%dztcp://[::1]:%dz::1ztcp://[%s]:%dN)rW  rY   r[  r$   rZ  r%   )r]  ra   r   r}   r_  r.   r.   r5   assert_can_connect_locally_6  s    

rd  c          
   c   s   t | trt| } | j}| jdd }|dk	r<| j}|| _t }t	|g| jdd< | 
| z
|V  W d|| jdd< | 
| |dk	r|| _X dS )z*Capture output from the given Logger.
    N)r/   rW   r0   r  r1   Zhandlers	propagatesixStringIOStreamHandlerr  )r4   r1   re  Z
orig_levelZorig_handlersZorig_propagateZsior.   r.   r5   captured_logger  s     




ri  c             c   s:   t | tjst| j}t | _z| jV  W d|| _X dS )z9Capture output from the given logging.StreamHandler.
    N)r/   r0   rh  rY   streamrf  rg  )ZhandlerZorig_streamr.   r.   r5   captured_handler  s    
rk  c          	   c   st   ddl m} tj j }| }z6|  ||  tj ||  t| dV  W d|  || t| X dS )z6
    Temporarily change configuration dictionary.
    r   )defaultsN)rs   rl  rr   ru   rH   rt   r   )
new_configrl  rs   Zorig_configr.   r.   r5   rm    s    

rm  c          	   c   s>   t j }t j|  z
d V  W d t j  t j| X d S )N)rz   r{   ru   rt   rH   )ZchangesZsaved_environr.   r.   r5   new_environment  s    


rn  c          	   c   s   ddl }tjd}tjdd\}}z\t|d}|||  W dQ R X |tjd< z
dV  W d|rt|tjd< ntjd= X W dt	| X dS )zH
    Temporarily change configuration file to match dictionary *c*.
    r   NZDASK_CONFIGzdask-config)rH  rF   )
yamlrz   r{   r|   tempfileZmkstempfdopenr;   dumpremove)r,  ro  Zold_filefdrG  fr.   r.   r5   new_config_file  s    

rv  Ztestsc             C   s&   t jt| }t j|s"t||S )z;
    Get the path to one of the test TLS certificates.
    )rz   rG  r:   	certs_direxistsrY   )filenamerG  r.   r.   r5   get_cert  s    rz  c              C   s2   t d} t d}d| d|id|id|idi}|S )z=
    A functional TLS configuration with our test certs.
    ztls-ca-cert.pemztls-key-cert.pemZtlsZcert)zca-filer   r   r   )rz  )ca_fileZkeycertr,  r.   r.   r5   
tls_config  s    r|  c              C   s   t  } d| d< | S )zg
    A functional TLS configuration with our test certs, disallowing
    plain TCP communications.
    Tzrequire-encryption)r|  )r,  r.   r.   r5   tls_only_config4  s    r}  c           	   C   s    t t  t } W dQ R X | S )z:
    A Security object with proper TLS configuration.
    N)rm  r|  r    )secr.   r.   r5   tls_security>  s    r  c           	   C   s*   t t  t } W dQ R X | js&t| S )zg
    A Security object with proper TLS configuration and disallowing plain
    TCP communications.
    N)rm  r}  r    Zrequire_encryptionrY   )r~  r.   r.   r5   r   G  s    
r   tls-cert.pemtls-key.pemtls-ca-cert.pemc             C   s<   t jt jjt|d}d|_t j|_|t| t| |S )N)cafileF)	sslcreate_default_contextPurposeZCLIENT_AUTHrz  check_hostnameCERT_REQUIREDverify_modeload_cert_chain)certfilekeyfiler{  rg   r.   r.   r5   get_server_ssl_contextR  s    
r  c             C   s<   t jt jjt|d}d|_t j|_|t| t| |S )N)r  F)	r  r  r  ZSERVER_AUTHrz  r  r  r  r  )r  r  r{  rg   r.   r.   r5   get_client_ssl_context\  s    
r  c          
   C   st   t d}y0|| \}}||k r8|| |t||f W n4 tk
rn } zt d||f  W d d }~X Y nX d S )Nresourcez.rlimit too low (%s) and can't be increased: %s)r   ZimportorskipZ	getrlimitZ	setrlimitmaxr   r  )limitZdesiredr  ZsoftZhardr^   r.   r.   r5   bump_rlimitf  s    
r  c              K   s&   |  dddg tf dt d| S )Nr   )ztls://127.0.0.1r   )ztls://127.0.0.1r   ztls://127.0.0.1)r   r   )
setdefaultr7  r   )r~   r.   r.   r5   gen_tls_clusterr  s    r  )r   )r   )r   )r   )r   )r   )r   )T)r   )NNN)r   N)r   N)rM   )r`   )NrS  )NrS  )NN)NNr\  )NNr\  )NN)NN)NN)r  r  r  )r  r  r  )Z
__future__r   r   r   collections
contextlibr   ru   Zdatetimer   r   r   r   	itertoolsr0   Zlogging.configrz   rZ   rU   r  r=  rP  rE  r   rp  r   rP   r   r   r
  r   r  ImportErrorr   rf  rr   Ztoolzr	   r
   r   Ztornador   r   Ztornado.genr   Ztornado.ioloopr   r   r   r   r   Zcompatibilityr   r   r   r   Z
comm.utilsr   rs   r   Zcorer   r   r   ZmetricsZprocessr   Z	proctitler   r   r    Zutilsr!   r"   r#   r$   r%   r&   r'   r(   r)   r*   r   r+   r,   r-   Z
dask.arrayr  r   r4   rootZmanagerZ
loggerDictr   r  r-  Zfixturer>   rA   rB   r   rG   r]   rd   rh   rO   rq   rv   rw   rx   r   r   r   r   r   r   r   r   r   r   r   r   r   r   r   r   r   r   defaultdictintr   countr   objectr   r   r   r   r   r?  ZasyncincrY   r   r   r   r   r   r   r   r   r   r   r   r   r   r   r   r  r  r  r  r   r   r%  r'  r7  r   r9  rA  rN  rR  rV  r   rW  rX  r  r  rZ  r  r[  r`  ra  rb  rc  rd  INFOri  rk  rm  rn  rv  rG  abspathr:   dirname__file__rw  rz  r|  r}  r  r   r  r  r  r  r.   r.   r.   r5   <module>   sh  
0
	0
	





	

	
  t
 #




	
	 
	 
	