B
    °F.\¢8  ã               @   sæ   d dl mZmZmZ d dlmZmZ d dlZd dlm	Z	 d dl
m
Z
 d dlZddlmZ ddlmZ dd	lmZmZ yd d
lmZ W n  ek
r¤   d d
lmZ Y nX dZdZe	dƒZe e¡Zej d¡ZG dd„ deƒZ dhZ!dS )é    )Úprint_functionÚdivisionÚabsolute_import)ÚdefaultdictÚdequeN)Úlog)Útimeé   )ÚCommClosedError)ÚSchedulerPlugin)Ú
log_errorsÚPeriodicCallback)Útopkg    „×—Ag{®Gáz„?é   zdistributed.admin.pdb-on-errc               @   sˆ   e Zd Zdd„ Zedd„ ƒZddd„Zddd	„Zd
d„ Zd dd„Z	dd„ Z
dd„ Zdd„ Zdd„ Zd!dd„Zdd„ Zdd„ Zdd„ ZdS )"ÚWorkStealingc             C   sî   || _ dd„ tdƒD ƒ| _tƒ | _tƒ | _ttƒ| _dd„ tdƒD ƒ| _	d| j	d< x|j
D ]}| j|d qZW t| jd| j jd	}|| _|| j jd
< | j j | ¡ | | j jd
< tdd| j jd
< d| _tƒ | _tdd„ ƒ| _| j| j jd< d S )Nc             S   s   g | ]
}t ƒ ‘qS © )Úset)Ú.0Úir   r   ú3lib/python3.7/site-packages/distributed/stealing.pyú
<listcomp>    s    z)WorkStealing.__init__.<locals>.<listcomp>é   c             S   s   g | ]}d d|d   ‘qS )r	   r   é   r   )r   r   r   r   r   r   (   s    r	   r   )Úworkeréd   )ÚcallbackZcallback_timeZio_loopÚstealingi † )Úmaxlenc               S   s   dS )Nr   r   r   r   r   r   Ú<lambda>:   s    z'WorkStealing.__init__.<locals>.<lambda>zsteal-response)Ú	schedulerÚrangeÚstealable_allÚdictÚ	stealableÚkey_stealabler   r   Ústealable_unknown_durationsÚcost_multipliersÚworkersÚ
add_workerr   ÚbalanceZloopÚ_pcZperiodic_callbacksZpluginsÚappendÚ
extensionsr   ÚeventsÚcountÚ	in_flightÚin_flight_occupancyÚmove_task_confirmZstream_handlers)Úselfr   r   Zpcr   r   r   Ú__init__   s*    

zWorkStealing.__init__c             C   s   | j jd S )Nr   )r   r-   )r2   r   r   r   r   >   s    zWorkStealing.logNc             C   s   dd„ t dƒD ƒ| j|< d S )Nc             S   s   g | ]
}t ƒ ‘qS r   )r   )r   r   r   r   r   r   C   s    z+WorkStealing.add_worker.<locals>.<listcomp>r   )r    r#   )r2   r   r   r   r   r   r(   B   s    zWorkStealing.add_workerc             C   s   | j |= d S )N)r#   )r2   r   r   r   r   r   Úremove_workerE   s    zWorkStealing.remove_workerc             C   s   | j  ¡  d S )N)r*   Ústop)r2   r   r   r   ÚteardownH   s    zWorkStealing.teardownc       
      O   s„   | j j| }|dkr|  |¡ |dkr€|  |¡ |dkrrxF| j |jd¡D ]"}	|	| jkrJ|	jdkrJ|  |	¡ qJW n| j |d ¡ d S )NÚ
processingÚmemoryr   )	r   ÚtasksÚput_key_in_stealableÚremove_key_from_stealabler%   ÚpopÚprefixr/   Ústate)
r2   ÚkeyÚstartZfinishZcompute_startZcompute_stopÚargsÚkwargsÚtsZttsr   r   r   Ú
transitionK   s    

zWorkStealing.transitionc             C   sn   |j }|j}|  |¡\}}| j d|j||f¡ |d k	rj| j|  |¡ | j| |  |¡ ||f| j	|< d S )Nzadd-stealable)
Úprocessing_onÚaddressÚsteal_time_ratior   r+   r?   r!   Úaddr#   r$   )r2   rC   Úwsr   Úcost_multiplierÚlevelr   r   r   r:   Z   s    z!WorkStealing.put_key_in_stealablec             C   s”   | j  |d ¡}|d krd S |\}}| j d|j||f¡ y| j| |  |¡ W n tk
rd   Y nX y| j|  |¡ W n tk
rŽ   Y nX d S )Nzremove-stealable)	r$   r<   r   r+   r?   r#   ÚremoveÚKeyErrorr!   )r2   rC   Úresultr   rK   r   r   r   r;   d   s    z&WorkStealing.remove_key_from_stealablec       	      C   sÔ   |j s|js|js|jrdS |js&dS tdd„ |jD ƒƒ}|t t }|j}|t	krXdS |j
}|dkrz| j|  |¡ dS |j| }|dk rdS || }|dkr¤dS ttt|ƒt d d	ƒƒ}td
|ƒ}||fS dS )a=   The compute to communication time ratio of a key

        Returns
        -------

        cost_multiplier: The increased cost from moving this task as a factor.
        For example a result of zero implies a task without dependencies.
        level: The location within a stealable list to place this value
        )NN)r   r   c             s   s   | ]}|  ¡ V  qd S )N)Z
get_nbytes)r   Zdepr   r   r   ú	<genexpr>†   s    z0WorkStealing.steal_time_ratio.<locals>.<genexpr>Ng{®Gázt?r   r   r   r	   )Zloose_restrictionsZhost_restrictionsZworker_restrictionsZresource_restrictionsZdependenciesÚsumÚ	BANDWIDTHÚLATENCYr=   Ú
fast_tasksrE   r%   rH   r7   ÚintÚroundr   Úlog_2Úmax)	r2   rC   ÚnbytesZtransfer_timeÚsplitrI   Zcompute_timerJ   rK   r   r   r   rG   t   s0    


zWorkStealing.steal_time_ratioc       	   
   C   s.  yÄ| j jr$||jk	r$dd l}| ¡  |j}|  |¡ t d|||j	||j	¡ |j
| }| j  |¡| j  ||¡ }| j j|j  d|dœ¡ ||||dœ| j|< | j|  |8  < | j|  |7  < W nd tk
rä   t d|¡ Y nF tk
r( } z&t |¡ trdd l}| ¡  ‚ W d d }~X Y nX d S )Nr   z#Request move %s, %s: %2f -> %s: %2fzsteal-request)Úopr?   )ÚvictimÚthiefÚvictim_durationÚthief_durationz%Worker comm closed while stealing: %s)r   ZvalidaterE   ÚpdbÚ	set_tracer?   r;   ÚloggerÚdebugÚ	occupancyr7   Zget_task_durationZget_comm_costZstream_commsrF   Úsendr/   r0   r
   ÚinfoÚ	ExceptionÚ	exceptionÚLOG_PDB)	r2   rC   r[   r\   r_   r?   r]   r^   Úer   r   r   Úmove_task_requestœ   s:    





zWorkStealing.move_task_requestc             C   s@  zæy˜y| j j| }W n  tk
r8   t d|¡ d S X y| j |¡}W n tk
r^   d S X |d }|d }t d||||¡ | j|  |d 8  < | j|  |d 7  < | jsÂtdd„ ƒ| _|j	d	ksØ|j
|k	r,|j}t|j ¡ ƒ}	|j}
t|j ¡ ƒ}|	|_||_| j  j|	| | |
 7  _d S |j| j jksL|j| j jkr\| j  |¡ d S |d
krš| j d||j|jf¡ | j  |¡ | j  |¡ n|dkr|  |¡ ||_
|j |¡}| j|8  _| j  j|8  _|jsþ| j  j|j8  _d|_|d |j|< | j|d 7  _| j  j|d 7  _|  |¡ y| j  |j|¡ W n$ tk
rt   | j  |j¡ Y nX | j d||j|jf¡ ntd| ƒ‚W nF tk
rä } z&t |¡ trÒdd l}|  ¡  ‚ W d d }~X Y nX W d y| j  |¡ W n tk
r   Y nX y| j  |¡ W n tk
r8   Y nX X d S )Nz,Key released between request and confirm: %sr\   r[   z%Confirm move %s, %s -> %s.  State: %sr^   r]   c               S   s   dS )Nr   r   r   r   r   r   r   ×   s    z0WorkStealing.move_task_confirm.<locals>.<lambda>r7   )r8   Z	executingzlong-runningNzalready-computing)ZwaitingZreadyr   ZconfirmzUnexpected task state: %s)!r   r9   rM   ra   rb   r/   r<   r0   r   r>   rE   rc   rP   r7   ÚvaluesZtotal_occupancyrF   r'   Z
rescheduler   r+   Úcheck_idle_saturatedr;   r:   Zsend_task_to_workerr
   r4   Ú
ValueErrorrf   rg   rh   r_   r`   )r2   r?   r   r>   rC   Údr\   r[   Z	old_thiefZ	new_thiefZ
old_victimZ
new_victimÚdurationri   r_   r   r   r   r1   Ã   sŒ    




zWorkStealing.move_task_confirmc                s¾  ˆj ‰‡fdd„‰ ‡ ‡‡‡‡fdd„}tƒ † d}ˆj}ˆj}|rVt|ƒtˆjƒkrZd S g ‰tƒ ‰ˆjs’tdˆj ¡ ˆ d}‡ fdd	„|D ƒ}ntˆjƒd
k r®t	|ˆ dd}t|ƒd
k rÆt	|ˆ d}x¤t
ˆjƒD ]”\}}|säP x¼t|ƒD ]°}ˆj|j | }|rî|sqîxŽt|ƒD ]‚}	|	ˆjks4|	j|k	rB| |	¡ q|d7 }|sRP ||t|ƒ  }
|j |	¡}|d kr†| |	¡ q|||	||
||ƒ qW qîW ˆj| d
k rÔˆj| }x®t|ƒD ]¢}	|sÐP |	ˆjkrê| |	¡ qÄ|	j}|d kr| |	¡ qÄˆ |ƒdk rqÄt|jƒ|jkr0qÄ|d7 }||t|ƒ  }
|j|	 }|||	||
||ƒ qÄW qÔW ˆrŽˆj ˆ¡ ˆ jd7  _tƒ }ˆjr°ˆjd  |ˆ ¡ W d Q R X d S )Nc                s   | j ˆ j|   S )N)rc   r0   )rI   )r2   r   r   Úcombined_occupancy  s    z0WorkStealing.balance.<locals>.combined_occupancyc          
      sv   ˆ |ƒ}ˆ |ƒ}|||  ||d  krrˆ  |||¡ ˆ ˆ| |j||j||j|f¡ ˆj||d ˆj||d d S )Nr   )Zocc)rj   r+   r?   rF   rl   )rK   rC   ÚsatÚidlro   rJ   Zocc_idlZocc_sat)rp   r   Úsr2   r@   r   r   Úmaybe_move_task  s    z-WorkStealing.balance.<locals>.maybe_move_taskr   é
   )r?   c                s,   g | ]$}ˆ |ƒd krt |jƒ|jkr|‘qS )gš™™™™™É?)Úlenr7   Úncores)r   rI   )rp   r   r   r   5  s    z(WorkStealing.balance.<locals>.<listcomp>é   T)r?   Úreverser	   gš™™™™™É?zsteal-duration)r   r   ÚidleÚ	saturatedrv   r'   r   r   rk   ÚsortedÚ	enumerater&   Úlistr#   rF   r$   rE   Údiscardr7   Úgetr!   rw   r   r+   r.   ZdigestsrH   )r2   rt   r   rz   r{   rK   rJ   rq   r#   rC   rr   ro   r5   r   )rp   r   rs   r2   r@   r   r)     s†    











zWorkStealing.balancec             C   sZ   x(| j  ¡ D ]}x|D ]}| ¡  qW qW x| jD ]}| ¡  q2W | j ¡  | j ¡  d S )N)r#   rk   Úclearr!   r$   r%   )r2   r   r#   rs   r   r   r   Úrestartw  s    

zWorkStealing.restartc                s^   t ˆ ƒ‰ g }xL| jD ]B}t|tƒs(|g}x,|D ]$}t‡ fdd„|D ƒƒr.| |¡ q.W qW |S )Nc             3   s   | ]}|ˆ kV  qd S )Nr   )r   Úx)Úkeysr   r   rO   ˆ  s    z%WorkStealing.story.<locals>.<genexpr>)r   r   Ú
isinstancer~   Úanyr+   )r2   r„   ÚoutÚLÚtr   )r„   r   Ústory  s    

zWorkStealing.story)NN)NN)NN)NNN)Ú__name__Ú
__module__Ú__qualname__r3   Úpropertyr   r(   r4   r6   rD   r:   r;   rG   rj   r1   r)   r‚   rŠ   r   r   r   r   r      s   !

 

('
T`
r   zshuffle-split)"Z
__future__r   r   r   Úcollectionsr   r   ZloggingZmathr   r   ZdaskZcorer
   Zdiagnostics.pluginr   Zutilsr   r   Zcytoolzr   ÚImportErrorZtoolzrQ   rR   rV   Z	getLoggerr‹   ra   Zconfigr€   rh   r   rS   r   r   r   r   Ú<module>   s*   
  s