B
    F.\%                 @   s   d dl mZmZmZ d dlZd dlmZmZ d dlm	Z	 ddl
mZ ddlmZmZ dd	lmZ dd
lmZmZ ddlmZ eeZdd Ze	jdd Zdd Zd%ddZdd ZddddedZddddddZ ddd d!d"dZ!d#d$ Z"dS )&    )print_functiondivisionabsolute_importN)valmapmerge)gen   )AllProgress   )connectcoerce_to_address)	Scheduler)	key_splitcolor_of)dumps_functionc                s(   t tt j jd fdddD S )N)allnbytesc                s   i | ]}t t j| |qS  )r   lenstate).0r   )allprogressr   Flib/python3.7/site-packages/distributed/diagnostics/progress_stream.py
<dictcomp>   s   zcounts.<locals>.<dictcomp>)memoryerredreleased
processing)r   r   r   r   r   )	schedulerr   r   )r   r   counts   s    
r   c             c   sF   t | } t| V }|dtttt|ttjdV  t	|dS )a#   Open a TCP connection to scheduler, receive progress messages

    The messages coming back are dicts containing counts of key groups::

        {'inc': {'all': 5, 'memory': 2, 'erred': 0, 'released': 1},
         'dec': {'all': 1, 'memory': 0, 'erred': 0, 'released': 0}}

    Parameters
    ----------
    address: address of scheduler
    interval: time between batches, in seconds

    Examples
    --------
    >>> stream = yield eventstream('127.0.0.1:8786', 0.100)  # doctest: +SKIP
    >>> print(yield read(stream))  # doctest: +SKIP
    Zfeed)opZsetupZfunctionintervalZteardownN)
r   r   writer   r	   r   r   Zremove_pluginr   ZReturn)Zaddressr!   Zcommr   r   r   progress_stream   s    
r#   c          	   C   s  t |  }t| }g g g g g g g g d}|s2|S d}x|D ]}|}| | | | }|| d }|d | | d  |d t| | | d d |d | |d	 | |d
 | |d t| |d | || dkr|d | q<|d d q<W |S )z Convert nbytes message into rectangle placements

    >>> nbytes_bar({'inc': 1000, 'dec': 3000}) # doctest: +NORMALIZE_WHITESPACE
    {'names': ['dec', 'inc'],
     'left': [0, 0.75],
     'center': [0.375, 0.875],
     'right': [0.75, 1.0]}
    )nametextleftrightcentercolorpercentMBr   r
   r+   i@B r*   d   r&   r'   r(   r)   r$   g?r%    )sumvaluessortedappendroundr   )r   Ztotalnamesdr'   r$   r&   r(   r   r   r   
nbytes_bar7   s8    	
 r5         c                s   dt | d | d jdd  d|   t } fdd|  D } |d< d	d
  D |d< fdd
t|D |d< fdd
t|D |d< fdd
t|D |d< fdd
t|D |d< dd
  D |d< g |d< g |d< g |d< g |d< g |d< xt|d |d |d |d |d |d D ]\}}}}}	}
| |	 |
 }||  |	 |
 }|| |  |	 |
 }|| | |  |	 |
 }d|| | |	f }|d | |d | |d | |d | |d | q4W |S ) a<  

    >>> msg = {'all': {'inc': 5, 'dec': 1, 'add': 4},
    ...        'memory': {'inc': 2, 'dec': 0, 'add': 1},
    ...        'erred': {'inc': 0, 'dec': 1, 'add': 0},
    ...        'released': {'inc': 1, 'dec': 0, 'add': 1},
    ...        'processing': {'inc': 1, 'dec': 0, 'add': 2}}

    >>> progress_quads(msg, nrows=2)  # doctest: +SKIP
    {'name': ['inc', 'add', 'dec'],
     'left': [0, 0, 1],
     'right': [0.9, 0.9, 1.9],
     'top': [0, -1, 0],
     'bottom': [-.8, -1.8, -.8],
     'released': [1, 1, 0],
     'memory': [2, 1, 0],
     'erred': [0, 0, 1],
     'processing': [1, 0, 2],
     'done': ['3 / 5', '2 / 4', '1 / 1'],
     'released-loc': [.2/.9, .25 / 0.9, 1],
     'memory-loc': [3 / 5 / .9, .5 / 0.9, 1],
     'erred-loc': [3 / 5 / .9, .5 / 0.9, 1.9],
     'processing-loc': [4 / 5, 1 / 1, 1]}}
    g?r   T)keyreverseNc                s$   i | ]\}  fd dD |qS )c                s   g | ]}  |d qS )r   )get)r   r$   )vr   r   
<listcomp>   s    z-progress_quads.<locals>.<dictcomp>.<listcomp>r   )r   k)r3   )r;   r   r      s    z"progress_quads.<locals>.<dictcomp>r$   c             S   s,   g | ]$}t |d kr|n|dd d qS )   N   z...)r   )r   r$   r   r   r   r<      s   z"progress_quads.<locals>.<listcomp>z	show-namec                s   g | ]}|  qS r   r   )r   i)nrowsr   r   r<      s    r&   c                s   g | ]}|   qS r   r   )r   r@   )rA   widthr   r   r<      s    r'   c                s   g | ]}|   qS r   r   )r   r@   )rA   r   r   r<      s    topc                s   g | ]}|   d  qS )g?r   )r   r@   )rA   r   r   r<      s    Zbottomc             S   s   g | ]}t |qS r   )r   )r   r$   r   r   r   r<      s    r)   zreleased-locz
memory-locz	erred-loczprocessing-locdoner   r   r   r   z%d / %d)r0   r:   r   itemsrangezipr1   )msgrA   Zncolsnr4   rmepalZrlZmlZelZplrD   r   )r3   rA   rB   r   progress_quadsc   s>    &rP   c             C   s(   | d dkr t | d }t|S dS d S )NZstatusZOKr8   Zblack)r   r   )rH   splitr   r   r   color_of_message   s    rR   ZredZorangeZgray)transferz
disk-writez	disk-readdeserializecomputeg?)rS   rU   rT   z
disk-writez	disk-readz	transfer-zdisk-write-z
disk-read-zdeserialize-r-   c             C   s*  |d }t |}|dg }x|D ]\}}}t| }	t|	tk	rJ|	|}	| d || d d  | d d||   | d | | d t| |  | d |	 | d	 t|  | d
 |d
  d|d
 |d f }
| d |
 |
|krt|d ||
< | d ||
  q$W t|S )Nr8   
startstopsstartr
   i  Zdurationr$   r)   Zalphaworkerz%s-%dZthreadworker_thready)	r   r:   colorstypestrr1   prefixalphasr   )ZlistsrH   Zworkersr8   r$   rV   actionrW   stopr)   rY   r   r   r   task_stream_append   s(    
rb   )r6   r7   )#Z
__future__r   r   r   ZloggingZtoolzr   r   Ztornador   Zprogressr	   Zcorer   r   r   r   Zutilsr   r   rX   r   Z	getLogger__name__Zloggerr   	coroutiner#   r5   rP   rR   r[   r_   r^   rb   r   r   r   r   <module>   s<   
,
=