σ
ίόΪ\c           @` s}  d  d l  m Z m Z m Z d  d l Z d  d l m Z m Z d  d l m	 Z	 d d l
 m Z d d l m Z m Z d d	 l m Z d d
 l m Z m Z d d l m Z e j e  Z d   Z e	 j d    Z d   Z d d d  Z d   Z i d d 6d d 6d d 6d d 6e d 6Z i d d 6d d 6d d 6d d 6d d 6Z  i d d 6d d 6d d 6d d 6d  d 6Z! d!   Z" d S("   i    (   t   print_functiont   divisiont   absolute_importN(   t   valmapt   merge(   t   geni   (   t   AllProgressi   (   t   connectt   coerce_to_address(   t	   Scheduler(   t	   key_splitt   color_of(   t   dumps_functionc         ` sF   t  i t t   j  d 6  j d 6  f d   d d d d g D  S(   Nt   allt   nbytesc         ` s)   i  |  ] } t  t   j |  |  q S(    (   R   t   lent   state(   t   .0R   (   t   allprogress(    sF   lib/python2.7/site-packages/distributed/diagnostics/progress_stream.pys
   <dictcomp>   s   	t   memoryt   erredt   releasedt
   processing(   R   R   R   R   R   (   t	   schedulerR   (    (   R   sF   lib/python2.7/site-packages/distributed/diagnostics/progress_stream.pyt   counts   s     c         c` sr   t  |   }  t |   V} | j i d d 6t t  d 6t t  d 6| d 6t t j  d 6 Vt j	 |   d S(   s#   Open a TCP connection to scheduler, receive progress messages

    The messages coming back are dicts containing counts of key groups::

        {'inc': {'all': 5, 'memory': 2, 'erred': 0, 'released': 1},
         'dec': {'all': 1, 'memory': 0, 'erred': 0, 'released': 0}}

    Parameters
    ----------
    address: address of scheduler
    interval: time between batches, in seconds

    Examples
    --------
    >>> stream = yield eventstream('127.0.0.1:8786', 0.100)  # doctest: +SKIP
    >>> print(yield read(stream))  # doctest: +SKIP
    t   feedt   opt   setupt   functiont   intervalt   teardownN(
   R   R   t   writeR   R   R   R	   t   remove_pluginR   t   Return(   t   addressR   t   comm(    (    sF   lib/python2.7/site-packages/distributed/diagnostics/progress_stream.pyt   progress_stream   s    c         C` sv  t  |  j    } t |   } i g  d 6g  d 6g  d 6g  d 6g  d 6g  d 6g  d 6g  d 6} | sf | Sd	 } x| D]ϋ } | } |  | | | } | | d
 } | d j |  | d  | d j t |  | | d d
   | d j |  | d j |  | d j |  | d j t |   | d j |  | | d k r]| d j |  qs | d j d  qs W| S(   sχ    Convert nbytes message into rectangle placements

    >>> nbytes_bar({'inc': 1000, 'dec': 3000}) # doctest: +NORMALIZE_WHITESPACE
    {'names': ['dec', 'inc'],
     'left': [0, 0.75],
     'center': [0.375, 0.875],
     'right': [0.75, 1.0]}
    t   namet   textt   leftt   rightt   centert   colort   percentt   MBi    i   i@B id   gΉ?t    (   t   sumt   valuest   sortedt   appendt   roundR   (   R   t   totalt   namest   dR(   R%   R'   R)   (    (    sF   lib/python2.7/site-packages/distributed/diagnostics/progress_stream.pyt
   nbytes_bar>   s:    	
&i   i   c         ` sΒ  d } t  |  d d |  d j d t     | |    t    }   f d   |  j   D }   | d <g    D], } t |  d k r | n | d  d	 ^ qp | d
 <g  t |  D] } | | ^ q³ | d <g  t |  D] } | | | ^ qΪ | d <g  t |  D] } | | ^ q| d <g  t |  D] } | | d ^ q-| d <g    D] } t |  ^ qS| d <g  | d <g  | d <g  | d <g  | d <g  | d <xt | d | d | d | d | d | d  D]ε \ } }	 }
 } } } | | | | } | | |	 | | } | | |	 |
 | | } | | | |	 |
 | | } d | |	 |
 | f } | d j |  | d j |  | d j |  | d j |  | d j |  qΥW| S(   s<  

    >>> msg = {'all': {'inc': 5, 'dec': 1, 'add': 4},
    ...        'memory': {'inc': 2, 'dec': 0, 'add': 1},
    ...        'erred': {'inc': 0, 'dec': 1, 'add': 0},
    ...        'released': {'inc': 1, 'dec': 0, 'add': 1},
    ...        'processing': {'inc': 1, 'dec': 0, 'add': 2}}

    >>> progress_quads(msg, nrows=2)  # doctest: +SKIP
    {'name': ['inc', 'add', 'dec'],
     'left': [0, 0, 1],
     'right': [0.9, 0.9, 1.9],
     'top': [0, -1, 0],
     'bottom': [-.8, -1.8, -.8],
     'released': [1, 1, 0],
     'memory': [2, 1, 0],
     'erred': [0, 0, 1],
     'processing': [1, 0, 2],
     'done': ['3 / 5', '2 / 4', '1 / 1'],
     'released-loc': [.2/.9, .25 / 0.9, 1],
     'memory-loc': [3 / 5 / .9, .5 / 0.9, 1],
     'erred-loc': [3 / 5 / .9, .5 / 0.9, 1.9],
     'processing-loc': [4 / 5, 1 / 1, 1]}}
    gΝΜΜΜΜΜμ?R   t   keyt   reversec         ` s>   i  |  ]4 \ } } g    D] } | j  | d   ^ q |  q S(   i    (   t   get(   R   t   kt   vR%   (   R4   (    sF   lib/python2.7/site-packages/distributed/diagnostics/progress_stream.pys
   <dictcomp>   s   	 R%   i   i   s   ...s	   show-nameR'   R(   t   topgι?t   bottomR*   s   released-locs
   memory-locs	   erred-locs   processing-loct   doneR   R   R   R   s   %d / %d(	   R0   R9   t   TrueR   t   itemst   rangeR   t   zipR1   (   t   msgt   nrowst   ncolst   widtht   nR5   R%   t   it   rt   mt   et   pt   at   lt   rlt   mlt   elt   plR>   (    (   R4   sF   lib/python2.7/site-packages/distributed/diagnostics/progress_stream.pyt   progress_quadsl   s<    #
='+(,#




Fc         C` s2   |  d d k r* t  |  d  } t |  Sd Sd  S(   Nt   statust   OKR7   t   black(   R
   R   (   RC   t   split(    (    sF   lib/python2.7/site-packages/distributed/diagnostics/progress_stream.pyt   color_of_message©   s    
t   redt   transfert   oranges
   disk-writes	   disk-readt   grayt   deserializet   computegΩ?s	   transfer-s   disk-write-s
   disk-read-s   deserialize-R-   c         C` ss  | d } t  |  } | j d g   } x>| D]6\ } } } t | }	 t |	  t k	 ri |	 |  }	 n  |  d j | | d d  |  d j d | |  |  d j |  |  d j t | |  |  d j |	  |  d	 j t |  |  d
 j | d
  d | d
 | d f }
 |  d j |
  |
 | k rPt |  d | |
 <n  |  d j | |
  q/ Wt |  S(   NR7   t
   startstopst   starti   iθ  t   durationR%   R*   t   alphat   workers   %s-%dt   threadt   worker_threadt   y(	   R
   R9   t   colorst   typet   strR1   t   prefixt   alphasR   (   t   listsRC   t   workersR7   R%   R_   t   actionR`   t   stopR*   Re   (    (    sF   lib/python2.7/site-packages/distributed/diagnostics/progress_stream.pyt   task_stream_appendΜ   s(    

(#   t
   __future__R    R   R   t   loggingt   toolzR   R   t   tornadoR   t   progressR   t   coreR   R   R   R	   t   utilsR
   R   Rc   R   t	   getLoggert   __name__t   loggerR   t	   coroutineR$   R6   RS   RX   Rg   Rk   Rj   Rp   (    (    (    sF   lib/python2.7/site-packages/distributed/diagnostics/progress_stream.pyt   <module>   sB   	
!	.=	


