B
    OK\J                 @   sD  d dl mZmZmZ d dlZd dlmZ d dlZd dlZd dl	m
Z
 d dlZd dlZd dlmZmZ d dlmZ d dlmZmZmZmZ d dlmZ d d	lm	Z	 d d
lmZmZ d dlmZmZm Z m!Z!m"Z"m#Z# d dlm$Z$m%Z% d dl&m'Z' d dlZe$Z(e%Z)ej*j+ej,-d dde!dddgdddd Z.e!ddgd ddd Z/e!ddgd ddd Z0ej*j1e!ddgd ddd  Z2e!ddgd! dd"d# Z3e!ddgd dd$d% Z4e!ddgd dd&d' Z5ej*j6d(de!ddgd) dd*d+ Z7e!ddgd! d,dd-d. Z8e!ddgddd/d0 Z9e!ddd1d2d3 Z:e!dddgdd4d5 Z;ej*j+ej,-d dde!ddd6gdd7d8 Z<e!dd9d:d;d<diifdgdd=d> Z=ej*j6d?de!dd9d:d;d<diifgd)dd@dA Z>e!ddgdB dddCdD Z?e!ddEgd ddFdG Z@e!ddgd! ddHdI ZAe!ddgd! dJe'idKdLdM ZBe!ddgd! ddNdO ZCdPdQ ZDdRdS ZEej*FdTd:gg gd:gg gfd d gg gd gd ggfdUdUgg gd gd ggfd d d gg gd d gd ggfd d gd d d gg gd d gd d gd ggfd d d gd gg gd d gd gd ggfd d:gg gd:gd ggfd d d d gg g gd d gd gd ggfd:d dd gg g gdd:gd gd ggfd:d:d:gg gd:d:gd:ggfd:d:d:d:gg gd:d:d:gd:ggfd d gd d gd d gg gd d gd d gd gd ggfdVddddd:d:gdVdd:d:gg g g gdVddddgdVdd:gd:gd:gd:ggfejGd:d:d:d:d:d:d:gd:d:gd:d:gd:d:gg gd:d:d:d:d:gd:d:gd:d:gd:d:gd:d:ggej*jHdWddXgdYdZ ZIe!ddgd edd[d\d] ZJe!dd^d_d` ZKe!dd^dadb ZLe!dd^dcdG Z@e!ddgd dddde ZMe!ddfgd ddgdh ZNe!ddgd ddidj ZOdS )k    )print_functiondivisionabsolute_importN)mul)sleep)sliding_windowconcat)gen)NannyWorkerwaitworker_client)config)time)	BANDWIDTH	key_split)slowincslowaddincgen_clusterslowidentitycaptured_logger)nodebug_setup_modulenodebug_teardown_module)TOTAL_MEMORYZlinuxz Need 127.0.0.2 to mean localhost)reasonT)z	127.0.0.1   )z	127.0.0.2r      )clientncorestimeoutc             c   s`   | j dg|jdV \}| ttd|gd }t|V  t|jdksJtt|jdks\td S )N   )workers2   
   )	Z_scatteraddressmapr   ranger   lendataAssertionError)csabxfutures r1   ;lib/python3.7/site-packages/distributed/tests/test_steal.pytest_work_stealing    s
    
r3   )z	127.0.0.1r!   r   )r   r   c             #   s   t djjd jdtgV  jjdg jd}t|gV   fddtdD }t|V  t|j	j
 dkstt|jdkstt jd	kstd S )
Nnumpyi@B )r"   r!   c          	      s$   g | ]}j jd  jddqS )FT)purer"   allow_other_workers)submitsumr%   ).0i)r-   r+   npr/   r1   r2   
<listcomp>4   s   zCtest_dont_steal_expensive_data_fast_computation.<locals>.<listcomp>r$   r      )pytestimportorskipr7   aranger%   r   r8   r'   r(   who_haskeyr*   r)   )r+   r,   r-   r.   futureZcheapr1   )r-   r+   r;   r/   r2   /test_dont_steal_expensive_data_fast_computation,   s    

rD   c             c   sb   | j tddd}t|V  | jttdd|jdd}t|V  tt|jt|j dks^t	d S )Nd   g?)delayr$   T)rF   r"   r6      )
r7   r   r   r&   r'   r%   absr(   r)   r*   )r+   r,   r-   r.   r/   r0   r1   r1   r2   &test_steal_cheap_data_slow_computation<   s    

rI   c             #   s   t d} jtdd|jdtV   j|jd|jdtV   fddtd	D }t|V  t|j	j
 d
kst|jstd S )Nr4   rE   g?)rF   r"   i@B )r"   c                s   g | ]} j td ddqS )g?F)rF   r5   )r7   r   )r9   r:   )r+   r/   r1   r2   r<   R   s    z>test_steal_expensive_data_slow_computation.<locals>.<listcomp>r   r!   )r>   r?   r7   r   r%   r   r@   r'   r(   rA   rB   r*   r)   )r+   r,   r-   r.   r;   slowr1   )r+   r/   r2   *test_steal_expensive_data_slow_computationG   s    



rK   r$   c             g   s   | j tddd}|V  | jt|gd ddd}t|V  x4|j D ]&\}}dt|  k rddk sDn tqDW t|j|j	 d	kstt
tt|j d
k std S )Ng?)rF   rE   F)r5   rF   r      r!      )r7   r   r&   r   has_whatitemsr(   r*   rA   rB   r8   values)r+   r,   r"   r/   Zxswkeysr1   r1   r2   test_worksteal_many_thievesY   s    
"rT   c             c   sJ   | j tddg|jdd}t|V  t|jdks4tt|jdksFtd S )Nr!   r   T)r"   r6   r   )r&   r   r%   r   r(   r)   r*   )r+   r,   r-   r.   r0   r1   r1   r2   !test_dont_steal_unknown_functionsi   s    
rU   c             c   sL   | j ttdd|jdd}t|V  t|jdks6tt|jdksHtd S )Nr$   g?T)rF   r"   r6      )r&   r   r'   r%   r   r(   r)   r*   )r+   r,   r-   r.   r0   r1   r1   r2   'test_eventually_steal_unknown_functionsq   s
    
rW    rV   c       	      c   sp   | j ttdd|jdd}t|V  d}x6td|D ](\}}|j|j |j|j kr4|d7 }q4W |dksltd S )	Nr   g?T)rF   r"   r6   r   r   r!   r$   )	r&   r   r'   r%   r   r   rA   rB   r*   )	er,   r-   r.   r+   r0   Znearbyf1f2r1   r1   r2   test_steal_related_tasksz   s    
r\   i  c             g   s   t d}| j|jjd|d jd}ddd}t| |dV  | j|td|d	}t|V  t|j	|j
 dksvtt|j|d j d
kstd S )Nr4   i r   )r"   c             S   s   d S )Nr1   )r/   yr1   r1   r2   
do_nothing   s    z.test_dont_steal_fast_tasks.<locals>.do_nothingr!   i  )r]   i  )N)r>   r?   r7   randomr%   r   r&   r'   r(   rA   rB   r*   rO   )r+   r,   r"   r;   r/   r^   r0   r1   r1   r2   test_dont_steal_fast_tasks   s    


r`   c             c   s   t | jtdddV  | jttddd}| t|}xt|jdk rTt	dV  q8W t
|j|j|jdtd}| V  |V }|ttttdkstx,||gD ] }tdd	 |j D stqW |jst| V  d S )
Nr!   g{Gz?)rF   rE   g?r$   )loopr   memory_limitc             s   s   | ]}t |tV  qd S )N)
isinstanceint)r9   vr1   r1   r2   	<genexpr>   s    z)test_new_worker_steals.<locals>.<genexpr>)r   r7   r   r&   r'   r8   r(   
task_stater	   r   r   ipportra   r   _startr   r*   allr)   rQ   _close)r+   r,   r-   r0   totalr.   resultrR   r1   r1   r2   test_new_worker_steals   s    
 
ro   )r   r    c             c   s   t | jtdddV  | jttd|jddd}t |V  dt|j  k rTdk sZn tdt|j  k rtdk szn t| t	|}|V }|t	tt
tdkstd S )	Nr!   g?)rF   rE   T)r"   r6   rF   r   P   )r   r7   r   r&   r'   r%   r(   r)   r*   r8   r   )r+   r,   r-   r.   r0   rm   rn   r1   r1   r2   test_work_steal_no_kwargs   s    

  rq   c             c   s   | j tdd|jd}|V  | jttdd|jd}x(t|jt|j dk rZtdV  q4W t|jdksnt	t|jdkst	|j
d  }tdV  t|jdkst	t|jdkst	d S )Nr!   g?)rF   r"   rE   g{Gz?r   stealing)r7   r   r%   r&   r'   r(   rg   r	   r   r*   
extensionsbalance)r+   r,   r-   r.   rC   r0   rn   r1   r1   r2   #test_dont_steal_worker_restrictions   s    ru   )z	127.0.0.2r!   c             c   s   | j tdd|jd}|V  | jttdddd}xt|jdk rNtdV  q2W t|jdksbt	t|jdkstt	|j
d	  }tdV  t|jdkst	t|jdkst	d S )
Nr!   g?)rF   r"   rE   z	127.0.0.1r$   g{Gz?r   rr   )r7   r   r%   r&   r'   r(   rg   r	   r   r*   rs   rt   )r+   r,   r-   r.   rC   r0   rn   r1   r1   r2   !test_dont_steal_host_restrictions   s    rv   z	127.0.0.1r!   	resourcesAc             c   s   | j tdd|jd}|V  | jttddddid}xt|jdk rRtdV  q6W t|jdksft	t|jd	ksxt	|j
d
  }tdV  t|jdkst	t|jd	kst	d S )Nr!   g?)rF   r"   rE   rx   )rF   rw   r$   g{Gz?r   rr   )r7   r   r%   r&   r'   r(   rg   r	   r   r*   rs   rt   )r+   r,   r-   r.   rC   r0   rn   r1   r1   r2   %test_dont_steal_resource_restrictions   s    ry   zno stealing of resourcesc             c   s   | j tdd|jd}|V  | jttddddid}xt|jdk rRtd	V  q6W t|jdksft	t
|j|j|jddd
id}| V  t }x6|jrt|jdkrtd	V  t |d k st	qW t|jdkst	t|jdk st	| V  d S )Nr!   g?)rF   r"   rE   g?rx   )rF   rw   e   g{Gz?   )ra   r   rw   rV   r   )r7   r   r%   r&   r'   r(   rg   r	   r   r*   r   rh   ri   ra   rj   r   rl   )r+   r,   r-   rC   r0   r.   startr1   r1   r2    test_steal_resource_restrictions  s    
r}   rG   c             g   sZ   d|j d j_dd }| |td}t|V  dd |D }t|t| dk sVtd S )	Nr   rr   c             S   s   t   d }t| |S )Ng?)r_   r   )r/   r]   r1   r1   r2   rJ     s    z/test_balance_without_dependencies.<locals>.slowrE   c             S   s   g | ]}t |j qS r1   )r8   r)   rQ   )r9   rR   r1   r1   r2   r<   &  s    z5test_balance_without_dependencies.<locals>.<listcomp>rV   )	rs   _pccallback_timer&   r'   r   maxminr*   )r+   r,   r"   rJ   r0   Z	durationsr1   r1   r2   !test_balance_without_dependencies  s    
r   )z	127.0.0.1r{   c             c   sL   | j ttdd|jdd}t|V  t|jdks6tt|jdksHtd S )Nr{   g?T)rF   r"   r6   r   )r&   r   r'   r%   r   r(   r)   r*   )r+   r,   r-   r.   r0   r1   r1   r2   test_dont_steal_executing_tasks*  s
    
r   c             '   s   d|j d j_ jtdd|jdtV  d|jd<  fdd	td
D }t|V  t	|j
dksjttdd |D rtd S )Nr   rr      0i )r"   g?r   c                s   g | ]} j td ddqS )Fg?)r5   rF   )r7   r   )r9   r:   )r+   r/   r1   r2   r<   ;  s    zDtest_dont_steal_few_saturated_tasks_many_workers.<locals>.<listcomp>r   rV   c             s   s   | ]}|j V  qd S )N)rg   )r9   rR   r1   r1   r2   rf   @  s    zCtest_dont_steal_few_saturated_tasks_many_workers.<locals>.<genexpr>)rs   r~   r   r7   r   r%   r   task_durationr'   r(   r)   r*   any)r+   r,   r-   restr0   r1   )r+   r/   r2   0test_dont_steal_few_saturated_tasks_many_workers4  s    


r   rb   )r   r   Zworker_kwargsc             '   s   d|j d j_ jtdd|jdtV  d|jd<  fdd	tdD }t	 }x4t
d
d |D stdV  t	 |d k sVtqVW d S )Nr   rr   r   i)r"   g?r   c                s   g | ]} j td ddqS )Fg?)r5   rF   )r7   r   )r9   r:   )r+   r/   r1   r2   r<   K  s   z.test_steal_when_more_tasks.<locals>.<listcomp>c             s   s   | ]}|j V  qd S )N)rg   )r9   rR   r1   r1   r2   rf   O  s    z-test_steal_when_more_tasks.<locals>.<genexpr>g{Gz?r!   )rs   r~   r   r7   r   r%   r   r   r'   r   r   r	   r   r*   )r+   r,   r-   r   r0   r|   r1   )r+   r/   r2   test_steal_when_more_tasksC  s    

r   c             '   s   dd }d|j d j_ jtdd|jdtV  d|jd	< d
|jd<  fddtdD } j|ddx"t	dd |D st
dV  qrW t	fdd|D std S )Nc             S   s   t d | S )Nr!   )r   )r/   r1   r1   r2   slow2W  s    z/test_steal_more_attractive_tasks.<locals>.slow2r   rr   r   i )r"   g?r   r!   r   c                s   g | ]} j td ddqS )Fg?)r5   rF   )r7   r   )r9   r:   )r+   r/   r1   r2   r<   b  s   z4test_steal_more_attractive_tasks.<locals>.<listcomp>r$   rL   )priorityc             s   s   | ]}|j V  qd S )N)rg   )r9   rR   r1   r1   r2   rf   f  s    z3test_steal_more_attractive_tasks.<locals>.<genexpr>g{Gz?c             3   s   | ]} j |jkV  qd S )N)rB   rg   )r9   rR   )rC   r1   r2   rf   j  s    )rs   r~   r   r7   r   r%   r   r   r'   r   r	   r   r*   )r+   r,   r-   r   r   r0   r1   )r+   rC   r/   r2    test_steal_more_attractive_tasksT  s    


r   c             C   s   t d d S )Nr!   )r   )r/   r1   r1   r2   funcm  s    r   c             '   s   j d }|j  t }tt| }t }g }	xt|| D ]\}
}xt|ddD ]}|r|j	t
|g|
jdV \} j|j }|j}t| |_x(|jD ]}| j|j| 7  _qW nd}d jtt|< t
|}|jt|dt||f |
jdd| d	}|	| qVW q@W x&t jt|	k r8td
V  qW xtdD ]}|  x|jrjtd
V  qRW  fdd|D }t|dd}t|dd}tdr||krdd l }|!  ||krDd S qDW t"d#t|t|d S )Nrr   T)reverse)r"   {   r!   z%d-%dF)rB   r"   r6   r5   r   gMbP?r$   c                s*   g | ]"}t d d  j|j D ddqS )c             S   s   g | ]}t t|qS r1   )rd   r   )r9   kr1   r1   r2   r<     s    z.assert_balanced.<locals>.<listcomp>.<listcomp>T)r   )sorted
processingr%   )r9   rR   )r,   r1   r2   r<     s   z#assert_balanced.<locals>.<listcomp>z
pdb-on-errr   zExpected: {}; got: {})$rs   r~   stop	itertoolscountlistr   zipr   Zscatternextr%   tasksrB   nbytesr   rA   r   strrd   r7   r   appendr(   rprocessingr	   r   r'   rt   	in_flightr   getpdbZ	set_trace	Exceptionformat)inpexpectedr+   r,   r"   stealZcounterr   Zdata_seqr0   rR   ZtstZdatZ
old_nbyteswsr:   frn   Zresult2Z	expected2r   r1   )r,   r2   assert_balancedq  sN    







r   zinp,expectedg?r{   z/Some uncertainty based on executing stolen task)Zmarksc                s2    fdd}t ddgt d|}|  d S )Nc                 s   t  f| |S )N)r   )argskwargs)r   r   r1   r2   <lambda>  s    ztest_balance.<locals>.<lambda>T)z	127.0.0.1r!   )r   r   )r   r(   )r   r   Ztestr1   )r   r   r2   test_balance  s    5r   )r   r   r   r    c             c   s   | j ttdd|jdd}x|j|j s6tdV  qW |jd }t	dd |j
D sZtt	d	d |j D svt| jd
dV  t	dd |j
D rtt	dd |j D rtd S )NrE   g?T)rF   r"   r6   g{Gz?rr   c             s   s   | ]
}|V  qd S )Nr1   )r9   str1   r1   r2   rf     s    ztest_restart.<locals>.<genexpr>c             s   s   | ]}|D ]
}|V  q
qd S )Nr1   )r9   Lr/   r1   r1   r2   rf     s    r$   )r    c             s   s   | ]
}|V  qd S )Nr1   )r9   r/   r1   r1   r2   rf     s    c             s   s   | ]}|D ]
}|V  q
qd S )Nr1   )r9   r   r/   r1   r1   r2   rf     s    )r&   r   r'   r%   r   Zworker_addressr	   r   rs   r   Zstealable_allr*   Z	stealablerQ   Zrestart)r+   r,   r-   r.   r0   r   r1   r1   r2   test_restart  s    
r   )r   c             #   s   j d }djd< jtdtt jdjtdtt|jd fddtd	D }x&tfd
d|D st	
dV  qbW |  x|jrt	
dV  qW j|j std S )Nrr   gMbP?r   r   )r"      1c                s&   g | ]}j td d jddqS )r!   FT)rF   r5   r"   r6   )r7   r   r%   )r9   r:   )r-   r+   r/   r]   r1   r2   r<     s   z8test_steal_communication_heavy_tasks.<locals>.<listcomp>r$   c             3   s   | ]}|j  jkV  qd S )N)rB   r   )r9   r   )r,   r1   r2   rf     s    z7test_steal_communication_heavy_tasks.<locals>.<genexpr>g{Gz?)rs   r   r7   r   rd   r   r%   r'   r   r	   r   rt   r   r   r*   )r+   r,   r-   r.   r   r0   r1   )r-   r+   r,   r/   r]   r2   $test_steal_communication_heavy_tasks  s    

r   c             #   s    j td|jdtV   fddtdD }xtjdk rRtdV  q6W fddtdD }d	d |D V  t|V  t	j
}d
d | D }t|dkrtdt|t|f  ttt| dk st  V  dd |D V  d S )Nr!   )r"   c                s   g | ]} j t|d dqS )g?)rF   )r7   r   )r9   r:   )r+   r/   r1   r2   r<     s    z$test_steal_twice.<locals>.<listcomp>rE   g{Gz?c                s    g | ]}t  j j jd qS ))ra   )r   rh   ri   ra   )r9   _)r,   r1   r2   r<     s    r   c             S   s   g | ]}|  qS r1   )rj   )r9   rR   r1   r1   r2   r<     s    c             S   s   g | ]\}}t |s|qS r1   )r(   )r9   rR   rS   r1   r1   r2   r<     s    r   z,Too many workers without keys (%d out of %d)rM   c             S   s   g | ]}|  qS r1   )rl   )r9   rR   r1   r1   r2   r<     s    )r7   r   r%   r   r'   r(   r   r	   r   dictrO   rP   r>   Zfailr   r&   rQ   r*   rl   )r+   r,   r-   r.   r0   r"   rO   Zempty_workersr1   )r+   r,   r/   r2   test_steal_twice  s     



r   c             c   s   |j d }| jtdd|jd}x|js4tdV  q W ||j|j	 |j
|j |j
|j  tdV  |j	|jksxt|jrtd S )Nrr   r!   g      ?)rF   r"   g{Gz?g?)rs   r7   r   r%   	executingr	   r   Zmove_task_requestr   rB   r"   r*   )r+   r,   r-   r.   r   rC   r1   r1   r2   r   "  s    
c             #   s2  dd }|  |dV  |  tdV  | j|ddg|jdd}x(ttt j d	k rdt	d
V  q>W t
 }x8t fdd|D rt	d
V  t
 |d k sntqnW t|j}t|j}| jttd|jdd}	t	dV  t|V  xD|D ]<}
tdd ||
D tdd ||
D  dkstqW d S )Nc          	   S   s   t  }t|  W d Q R X d S )N)r   r   )rF   r+   r1   r1   r2   long3  s    z0test_dont_steal_long_running_tasks.<locals>.longg?r!   g      ?g333333?T)r"   r6   r   g{Gz?c             3   s    | ]}|j  jd  jkV  qdS )rr   N)rB   rs   Zkey_stealable)r9   r   )r,   r1   r2   rf   @  s    z5test_dont_steal_long_running_tasks.<locals>.<genexpr>rE   g?c             s   s   | ]}|d  dkV  qdS )r!   r   Nr1   )r9   logr1   r1   r2   rf   N  s    c             s   s   | ]}|d  dkV  qdS )r!   r   Nr1   )r9   r   r1   r1   r2   rf   O  s    )r7   r   r&   r%   r8   r(   r   rQ   r	   r   r   r   r*   r   r'   r   Zstory)r+   r,   r-   r.   r   Z
long_tasksr|   ZnaZnbZincsr   r1   )r,   r2   "test_dont_steal_long_running_tasks1  s&    



r   )z	127.0.0.1rG   c             #   s*  G dd dt  d|jd j_jtdddV   fdd	td
D }jt|jddd}~t	|V  j
rx|j
s|ttj
t|j
 dkstt }|j
  ||j
  ~t }x.j
s|j
rtdV  t |d k stqW |jrtt|j rtt|r&td S )Nc               @   s   e Zd ZdS )z(test_cleanup_repeated_tasks.<locals>.FooN)__name__
__module____qualname__r1   r1   r1   r2   FooT  s   r   r   rr   rL   g?)rF   c                s   g | ]}j  d jdqS )F)r5   r"   )r7   r%   )r9   r   )r   r-   r+   r1   r2   r<   Y  s    z/test_cleanup_repeated_tasks.<locals>.<listcomp>r#   Tg?)r"   r6   rF   r$   g{Gz?r!   )objectrs   r~   r   r7   r   r'   r&   r%   r   r)   r*   r(   weakrefWeakSetupdaterQ   r   r	   r   rA   r   rO   r   )r+   r,   r-   r.   Zobjectsr/   r   r|   r1   )r   r-   r+   r2   test_cleanup_repeated_tasksR  s*    
r   c          
   c   st   t dN}d|jd _x:tdD ].}| jttddd|jdd	}tdV  ~q W W d Q R X |	 }d
|kspt
d S )Nzdistributed.stealingr!   rr   rE   r$   g{Gz?FT)rF   r5   r"   r6   Error)r   Zperiodic_callbacksZintervalr'   r&   r   r%   r	   r   getvaluer*   )r+   r,   r-   r.   r   r:   r0   outr1   r1   r2   test_lose_taskq  s    
r   )PZ
__future__r   r   r   r   operatorr   r_   sysr   r   r   r>   Ztoolzr   r   Ztornador	   Zdistributedr
   r   r   r   Zdistributed.configr   Zdistributed.metricsZdistributed.schedulerr   r   Zdistributed.utils_testr   r   r   r   r   r   r   r   Zdistributed.workerr   Zsetup_moduleZteardown_moduleZmarkZskipifplatform
startswithr3   rD   rI   Zavoid_travisrK   rT   rU   rW   skipr\   r`   ro   rq   ru   rv   ry   r}   r   r   r   r   r   r   r   ZparametrizeZparamZxfailr   r   r   r   r   r   r   r1   r1   r1   r2   <module>   s    
		
6




*&!