ó
ßüÚ\c           @` s4  d  d l  m Z m Z m Z d  d l m Z m Z d  d l Z d  d l m	 Z	 d  d l
 m
 Z
 d  d l Z d d l m Z d d l m Z d d	 l m Z m Z y d  d
 l m Z Wn! e k
 r× d  d
 l m Z n Xd Z d Z e	 d ƒ Z e j e ƒ Z e j j d ƒ Z d e f d „  ƒ  YZ  d h Z! d S(   i    (   t   print_functiont   divisiont   absolute_import(   t   defaultdictt   dequeN(   t   log(   t   timei   (   t   CommClosedError(   t   SchedulerPlugin(   t
   log_errorst   PeriodicCallback(   t   topkg    „×—Ag{®Gáz„?i   s   distributed.admin.pdb-on-errt   WorkStealingc           B` s§   e  Z d  „  Z e d „  ƒ Z d d d „ Z d d d „ Z d „  Z d d d „ Z	 d „  Z
 d „  Z d „  Z d	 „  Z d d d d
 „ Z d „  Z d „  Z d „  Z RS(   c         C` sk  | |  _  g  t d ƒ D] } t ƒ  ^ q |  _ t ƒ  |  _ t ƒ  |  _ t t ƒ |  _ g  t d ƒ D] } d d | d ^ qb |  _	 d |  j	 d <x! | j
 D] } |  j d | ƒ qš Wt d |  j d d	 d
 |  j  j ƒ } | |  _ | |  j  j d <|  j  j j |  ƒ |  |  j  j d <t d d ƒ |  j  j d <d |  _ t ƒ  |  _ t d „  ƒ |  _ |  j |  j  j d <d  S(   Ni   i   i   i   i    t   workert   callbackt   callback_timeid   t   io_loopt   stealingt   maxleni † c           S` s   d S(   Ni    (    (    (    (    s3   lib/python2.7/site-packages/distributed/stealing.pyt   <lambda>:   t    s   steal-response(   t	   schedulert   ranget   sett   stealable_allt   dictt	   stealablet   key_stealableR   t   stealable_unknown_durationst   cost_multiplierst   workerst
   add_workerR
   t   balancet   loopt   _pct   periodic_callbackst   pluginst   appendt
   extensionsR   t   eventst   countt	   in_flightt   in_flight_occupancyt   move_task_confirmt   stream_handlers(   t   selfR   t   iR   t   pc(    (    s3   lib/python2.7/site-packages/distributed/stealing.pyt   __init__   s(    	%.		c         C` s   |  j  j d S(   NR   (   R   R'   (   R-   (    (    s3   lib/python2.7/site-packages/distributed/stealing.pyR   >   s    c         C` s-   g  t  d ƒ D] } t ƒ  ^ q |  j | <d  S(   Ni   (   R   R   R   (   R-   R   R   R.   (    (    s3   lib/python2.7/site-packages/distributed/stealing.pyR   B   s    c         C` s   |  j  | =d  S(   N(   R   (   R-   R   R   (    (    s3   lib/python2.7/site-packages/distributed/stealing.pyt   remove_workerE   s    c         C` s   |  j  j ƒ  d  S(   N(   R"   t   stop(   R-   (    (    s3   lib/python2.7/site-packages/distributed/stealing.pyt   teardownH   s    c   
      O` s¿   |  j  j | } | d k r, |  j | ƒ n  | d k r» |  j | ƒ | d k r¥ xd |  j j | j d ƒ D]4 }	 |	 |  j k rj |	 j d k rj |  j |	 ƒ qj qj Wq» |  j j | d  ƒ n  d  S(   Nt
   processingt   memory(    (
   R   t   taskst   put_key_in_stealablet   remove_key_from_stealableR   t   popt   prefixR)   t   statet   None(
   R-   t   keyt   startt   finisht   compute_startt   compute_stopt   argst   kwargst   tst   tts(    (    s3   lib/python2.7/site-packages/distributed/stealing.pyt
   transitionK   s    c         C` s˜   | j  } | j } |  j | ƒ \ } } |  j j d | j | | f ƒ | d  k	 r” |  j | j | ƒ |  j	 | | j | ƒ | | f |  j
 | <n  d  S(   Ns   add-stealable(   t   processing_ont   addresst   steal_time_ratioR   R%   R=   R<   R   t   addR   R   (   R-   RD   t   wsR   t   cost_multipliert   level(    (    s3   lib/python2.7/site-packages/distributed/stealing.pyR7   [   s    		c         C` s°   |  j  j | d  ƒ } | d  k r% d  S| \ } } |  j j d | j | | f ƒ y |  j | | j | ƒ Wn t k
 r n Xy |  j	 | j | ƒ Wn t k
 r« n Xd  S(   Ns   remove-stealable(
   R   R9   R<   R   R%   R=   R   t   removet   KeyErrorR   (   R-   RD   t   resultR   RM   (    (    s3   lib/python2.7/site-packages/distributed/stealing.pyR8   e   s    c   	      C` s  | j  r) | j s% | j s% | j r) d S| j s6 d	 St d „  | j Dƒ ƒ } | t t } | j	 } | t
 k rv d
 S| j } | d k r£ |  j | j | ƒ d S| j | } | d k  rÀ d S| | } | d k rÚ d St t t | ƒ t d d ƒ ƒ } t d | ƒ } | | f Sd S(   s=   The compute to communication time ratio of a key

        Returns
        -------

        cost_multiplier: The increased cost from moving this task as a factor.
        For example a result of zero implies a task without dependencies.
        level: The location within a stealable list to place this value
        i    c         s` s   |  ] } | j  ƒ  Vq d  S(   N(   t
   get_nbytes(   t   .0t   dep(    (    s3   lib/python2.7/site-packages/distributed/stealing.pys	   <genexpr>‡   s    g{®Gázt?id   i   i   N(   NN(   i    i    (   NN(   NN(   NN(   NN(   t   loose_restrictionst   host_restrictionst   worker_restrictionst   resource_restrictionsR<   t   dependenciest   sumt	   BANDWIDTHt   LATENCYR:   t
   fast_tasksRG   R   RJ   R4   t   intt   roundR   t   log_2t   max(	   R-   RD   t   nbytest   transfer_timet   splitRK   t   compute_timeRL   RM   (    (    s3   lib/python2.7/site-packages/distributed/stealing.pyRI   u   s.    

			
#c   	      C` s…  y|  j  j r: | | j k	 r: d d  l } | j ƒ  q: n  | j } |  j | ƒ t j d | | | j	 | | j	 ƒ | j
 | } |  j  j | ƒ |  j  j | | ƒ } |  j  j | j j i d d 6| d 6ƒ i | d 6| d 6| d 6| d	 6|  j | <|  j | c | 8<|  j | c | 7<Wnb t k
 r?t j d
 | ƒ nB t k
 r€} t j | ƒ t rzd d  l } | j ƒ  n  ‚  n Xd  S(   Ni    s#   Request move %s, %s: %2f -> %s: %2fs   steal-requestt   opR=   t   victimt   thieft   victim_durationt   thief_durations%   Worker comm closed while stealing: %s(   R   t   validateRG   t   pdbt	   set_traceR=   R8   t   loggert   debugt	   occupancyR4   t   get_task_durationt   get_comm_costt   stream_commsRH   t   sendR)   R*   R   t   infot	   Exceptiont	   exceptiont   LOG_PDB(	   R-   RD   Rf   Rg   Rk   R=   Rh   Ri   t   e(    (    s3   lib/python2.7/site-packages/distributed/stealing.pyt   move_task_request   sF    	
	c         C` sÛ  zƒy:y |  j  j | } Wn" t k
 r> t j d | ƒ d  SXy |  j j | ƒ } Wn t k
 ri d  SX| d } | d } t j d | | | | ƒ |  j | c | d 8<|  j | c | d 7<|  j sã t d „  ƒ |  _ n  | j	 d k s| j
 | k	 rq| j } t | j j ƒ  ƒ }	 | j }
 t | j j ƒ  ƒ } |	 | _ | | _ |  j  j |	 | | |
 7_ d  S| j |  j  j k s›| j |  j  j k r¯|  j  j | ƒ d  S| d k r |  j j d | | j | j f ƒ |  j  j | ƒ |  j  j | ƒ n<| d k r,|  j | ƒ | | _
 | j j | ƒ } | j | 8_ |  j  j | 8_ | j s|  j  j | j 8_ d | _ n  | d | j | <| j | d 7_ |  j  j | d 7_ |  j | ƒ y |  j  j | j | ƒ Wn$ t k
 r|  j  j | j ƒ n X|  j j d | | j | j f ƒ n t d | ƒ ‚ WnB t k
 r} t j | ƒ t r{d d  l  } | j! ƒ  n  ‚  n XWd  y |  j  j | ƒ Wn t k
 r­n Xy |  j  j | ƒ Wn t k
 rÕn XXd  S(   Ns,   Key released between request and confirm: %sRg   Rf   s%   Confirm move %s, %s -> %s.  State: %sRi   Rh   c           S` s   d S(   Ni    (    (    (    (    s3   lib/python2.7/site-packages/distributed/stealing.pyR   â   R   R4   R5   t	   executings   long-runnings   already-computingt   waitingt   readyi    t   confirms   Unexpected task state: %s(   R5   Rz   s   long-runningN(   R{   R|   ("   R   R6   RO   Rm   Rn   R)   R9   R*   R   R;   RG   Ro   RY   R4   t   valuest   total_occupancyRH   R   t
   rescheduleR<   R   R%   t   check_idle_saturatedR8   R7   t   send_task_to_workerR   R1   t
   ValueErrorRu   Rv   Rw   Rk   Rl   (   R-   R=   R   R;   RD   t   dRg   Rf   t	   old_thieft	   new_thieft
   old_victimt
   new_victimt   durationRx   Rk   (    (    s3   lib/python2.7/site-packages/distributed/stealing.pyR+   Í   sŽ     

					
			%c         ` sØ  ˆ j  ‰ ‡ f d †  ‰  ‡  ‡ ‡ ‡ ‡ f d †  } t ƒ  –d } ˆ j } ˆ j } | sw t | ƒ t ˆ j ƒ k r{ d  Sg  ‰ t ƒ  ‰ ˆ j s÷ t d ˆ j j ƒ  d ˆ  ƒ} g  | D]6 } ˆ  | ƒ d k r¸ t | j	 ƒ | j
 k r¸ | ^ q¸ } n0 t ˆ j ƒ d k  r't | d ˆ  d t ƒ} n  t | ƒ d k  rNt | d ˆ  ƒ} n  x(t ˆ j ƒ D]\ } } | stPn  xö t | ƒ D]è } ˆ j | j | }	 |	 s| r¯qn  x· t |	 ƒ D]© }
 |
 ˆ j k sà|
 j | k	 ró|	 j |
 ƒ q¼n  | d	 7} | sPn  | | t | ƒ } | j	 j |
 ƒ } | d  k rL|	 j |
 ƒ q¼n  | | |
 | | | | ƒ q¼WqWˆ j | d k  r^ˆ j | }	 xå t |	 ƒ D]Ô }
 | sªPn  |
 ˆ j k rÌ|	 j |
 ƒ qšn  |
 j } | d  k rô|	 j |
 ƒ qšn  ˆ  | ƒ d k  rqšn  t | j	 ƒ | j
 k r*qšn  | d	 7} | | t | ƒ } | j	 |
 } | | |
 | | | | ƒ qšWq^q^Wˆ r¡ˆ j j ˆ ƒ ˆ j d	 7_ n  t ƒ  } ˆ j rÎˆ j d
 j | ˆ ƒ n  Wd  QXd  S(   Nc         ` s   |  j  ˆ  j |  S(   N(   Ro   R*   (   RK   (   R-   (    s3   lib/python2.7/site-packages/distributed/stealing.pyt   combined_occupancy*  s    c      	   ` s¢   ˆ  | ƒ } ˆ  | ƒ } | | | | | d k rž ˆ j  | | | ƒ ˆ j ˆ |  | j | | j | | j | f ƒ ˆ j | d | ƒˆ j | d | ƒn  d  S(   Ni   t   occ(   Ry   R%   R=   RH   R   (   RM   RD   t   satt   idlR‰   RL   t   occ_idlt   occ_sat(   RŠ   R   t   sR-   R>   (    s3   lib/python2.7/site-packages/distributed/stealing.pyt   maybe_move_task-  s    
i    i
   R=   gš™™™™™É?i   t   reversei   s   steal-duration(   R   R	   t   idlet	   saturatedt   lenR   R   R   R~   R4   t   ncorest   sortedt   Truet	   enumerateR   t   listR   RH   R   RG   t   discardt   getR<   R   R   R%   R(   t   digestsRJ   (   R-   R‘   R.   R“   R”   RK   RM   RL   RŒ   R   RD   R   R‰   R2   (    (   RŠ   R   R   R-   R>   s3   lib/python2.7/site-packages/distributed/stealing.pyR    '  s„    	
		"		
9
!	
$		c         C` sq   x2 |  j  j ƒ  D]! } x | D] } | j ƒ  q Wq Wx |  j D] } | j ƒ  q? W|  j j ƒ  |  j j ƒ  d  S(   N(   R   R~   t   clearR   R   R   (   R-   R   R   R   (    (    s3   lib/python2.7/site-packages/distributed/stealing.pyt   restart  s    c         ` s‚   t  ˆ  ƒ ‰  g  } xi |  j D]^ } t | t ƒ s= | g } n  x: | D]2 } t ‡  f d †  | Dƒ ƒ rD | j | ƒ qD qD Wq W| S(   Nc         3` s   |  ] } | ˆ  k Vq d  S(   N(    (   RR   t   x(   t   keys(    s3   lib/python2.7/site-packages/distributed/stealing.pys	   <genexpr>   s    (   R   R   t
   isinstanceRš   t   anyR%   (   R-   R¡   t   outt   Lt   t(    (   R¡   s3   lib/python2.7/site-packages/distributed/stealing.pyt   story™  s    N(   t   __name__t
   __module__R0   t   propertyR   R<   R   R1   R3   RF   R7   R8   RI   Ry   R+   R    RŸ   R§   (    (    (    s3   lib/python2.7/site-packages/distributed/stealing.pyR      s   	!		
		(	0Z	h	
s   shuffle-split("   t
   __future__R    R   R   t   collectionsR   R   t   loggingt   mathR   R   t   daskt   coreR   t   diagnostics.pluginR   t   utilsR	   R
   t   cytoolzR   t   ImportErrort   toolzRZ   R[   R_   t	   getLoggerR¨   Rm   t   configRœ   Rw   R   R\   (    (    (    s3   lib/python2.7/site-packages/distributed/stealing.pyt   <module>   s(   ÿ Š