B
    T\^,                 @   s   d dl mZmZmZ d dlmZ d dlmZ d dlm	Z	 d dl
mZ d dlmZmZmZ ddlmZ dd	lmZ ed
dZG dd deZeddZG dd deZG dd deZeddZG dd deZdS )    )absolute_importdivisionprint_function)
namedtuple)starmap)default_timer)sleep)ProcessPipecurrent_process   )Callback)import_requiredTaskData)keytaskZ
start_timeZend_timeZ	worker_idc                   s`   e Zd ZdZdd Z fddZdd Zdd	 Zd
d Zdd Z	dd Z
dd Zdd Z  ZS )Profilera4  A profiler for dask execution at the task level.

    Records the following information for each task:
        1. Key
        2. Task
        3. Start time in seconds since the epoch
        4. Finish time in seconds since the epoch
        5. Worker id

    Examples
    --------

    >>> from operator import add, mul
    >>> from dask.threaded import get
    >>> dsk = {'x': 1, 'y': (add, 'x', 10), 'z': (mul, 'y', 2)}
    >>> with Profiler() as prof:
    ...     get(dsk, 'z')
    22

    >>> prof.results  # doctest: +SKIP
    [('y', (add, 'x', 10), 1435352238.48039, 1435352238.480655, 140285575100160),
     ('z', (mul, 'y', 2), 1435352238.480657, 1435352238.480803, 140285566707456)]

    These results can be visualized in a bokeh plot using the ``visualize``
    method. Note that this requires bokeh to be installed.

    >>> prof.visualize() # doctest: +SKIP

    You can activate the profiler globally

    >>> prof.register()  # doctest: +SKIP

    If you use the profiler globally you will need to clear out old results
    manually.

    >>> prof.clear()

    c             C   s   i | _ g | _i | _d S )N)_resultsresults_dsk)self r   7lib/python3.7/site-packages/dask/diagnostics/profile.py__init__9   s    zProfiler.__init__c                s   |    tt|  S )N)clearsuperr   	__enter__)r   )	__class__r   r   r   >   s    zProfiler.__enter__c             C   s   | j | d S )N)r   update)r   dskr   r   r   _startB   s    zProfiler._startc             C   s   t  }||| |f| j|< d S )N)r   r   )r   r   r   statestartr   r   r   _pretaskE   s    zProfiler._pretaskc             C   s    t  }| j|  ||f7  < d S )N)r   r   )r   r   valuer   r!   idendr   r   r   	_posttaskI   s    zProfiler._posttaskc             C   sB   t dd | j D }|  jttt| 7  _| j  d S )Nc             s   s&   | ]\}}t |d kr||fV  qdS )   N)len).0kvr   r   r   	<genexpr>N   s    z#Profiler._finish.<locals>.<genexpr>)	dictr   itemsr   listr   r   valuesr   )r   r   r!   failedr   r   r   r   _finishM   s    zProfiler._finishc             K   s   ddl m} || j| jf|S )N   )
plot_tasks)profile_visualizer5   r   r   )r   kwargsr5   r   r   r   _plotR   s    zProfiler._plotc             K   s   ddl m} || f|S )zVisualize the profiling run in a bokeh plot.

        See also
        --------
        dask.diagnostics.profile_visualize.visualize
        r4   )	visualize)r6   r9   )r   r7   r9   r   r   r   r9   V   s    zProfiler.visualizec             C   s    | j   | jdd= i | _dS )z#Clear out old results from profilerN)r   r   r   r   )r   r   r   r   r   `   s    
zProfiler.clear)__name__
__module____qualname____doc__r   r   r    r#   r'   r3   r8   r9   r   __classcell__r   r   )r   r   r      s   &
r   ResourceData)timememcpuc                   s   e Zd ZdZdddZdd Zdd Zd	d
 Z fddZ fddZ	dd Z
dd Zdd ZeZdd Zdd Zdd Z  ZS )ResourceProfilera  A profiler for resource use.

    Records the following each timestep
        1. Time in seconds since the epoch
        2. Memory usage in MB
        3. % CPU usage

    Examples
    --------

    >>> from operator import add, mul
    >>> from dask.threaded import get
    >>> dsk = {'x': 1, 'y': (add, 'x', 10), 'z': (mul, 'y', 2)}
    >>> with ResourceProfiler() as prof:  # doctest: +SKIP
    ...     get(dsk, 'z')
    22

    These results can be visualized in a bokeh plot using the ``visualize``
    method. Note that this requires bokeh to be installed.

    >>> prof.visualize() # doctest: +SKIP

    You can activate the profiler globally

    >>> prof.register()  # doctest: +SKIP

    If you use the profiler globally you will need to clear out old results
    manually.

    >>> prof.clear()  # doctest: +SKIP

    Note that when used as a context manager data will be collected throughout
    the duration of the enclosed block. In contrast, when registered globally
    data will only be collected while a dask scheduler is active.
    r4   c             C   s   || _ d| _d | _g | _d S )NF)_dt_entered_trackerr   )r   dtr   r   r   r      s    zResourceProfiler.__init__c             C   s   | j d k	o| j  S )N)rF   Zis_alive)r   r   r   r   _is_running   s    zResourceProfiler._is_runningc             C   s0   |   st| j| _| j  | jjd d S )Ncollect)rH   _TrackerrD   rF   r"   parent_connsend)r   r   r   r   _start_collect   s    
zResourceProfiler._start_collectc             C   s4   |   r0| jjd | jtt| jj  d S )N	send_data)	rH   rF   rK   rL   r   extendr   r?   recv)r   r   r   r   _stop_collect   s    zResourceProfiler._stop_collectc                s$   d| _ |   |   tt|  S )NT)rE   r   rM   r   rC   r   )r   )r   r   r   r      s    zResourceProfiler.__enter__c                s*   d| _ |   |   tt| j|  d S )NF)rE   rQ   closer   rC   __exit__)r   args)r   r   r   rS      s    zResourceProfiler.__exit__c             C   s   |    d S )N)rM   )r   r   r   r   r   r       s    zResourceProfiler._startc             C   s   | j s|   d S )N)rE   rQ   )r   r   r!   r2   r   r   r   r3      s    zResourceProfiler._finishc             C   s   |   r| j  d| _dS )z%Shutdown the resource tracker processN)rH   rF   shutdown)r   r   r   r   rR      s    
zResourceProfiler.closec             C   s
   g | _ d S )N)r   )r   r   r   r   r      s    zResourceProfiler.clearc             K   s   ddl m} || jf|S )Nr4   )plot_resources)r6   rV   r   )r   r7   rV   r   r   r   r8      s    zResourceProfiler._plotc             K   s   ddl m} || f|S )zVisualize the profiling run in a bokeh plot.

        See also
        --------
        dask.diagnostics.profile_visualize.visualize
        r4   )r9   )r6   r9   )r   r7   r9   r   r   r   r9      s    zResourceProfiler.visualize)r4   )r:   r;   r<   r=   r   rH   rM   rQ   r   rS   r    r3   rR   __del__r   r8   r9   r>   r   r   )r   r   rC   j   s   #
rC   c               @   s2   e Zd ZdZdddZdd Zdd Zd	d
 ZdS )rJ   z.Background process for tracking resource usager4   c             C   s2   t |  d| _|| _t j| _t \| _| _	d S )NT)
r	   r   ZdaemonrG   r   pid
parent_pidr
   rK   
child_conn)r   rG   r   r   r   r      s
    

z_Tracker.__init__c             C   s*   | j js| j d | j   |   d S )NrU   )rK   closedrL   rR   join)r   r   r   r   rU      s    
z_Tracker.shutdownc                s    | j g fdd| j  D  S )Nc                s&   g | ]}|j  kr| d kr|qS )Zzombie)rX   Zstatus)r*   p)rX   r   r   
<listcomp>   s    z)_Tracker._update_pids.<locals>.<listcomp>)parentZchildren)r   rX   r   )rX   r   _update_pids   s    z_Tracker._update_pidsc          	   C   s  t dd}|| j| _t }g }xy| j }W n tk
rH   w$Y nX |dkrVP q$|dkr| |}x|rx| j	 st
 }d }}xH|D ]@}	y|	 j}
|	 }W n tk
r   Y qX ||
7 }||7 }qW |||d |f t| j qjW q$|dkr$| j| g }q$W | j  d S )Npsutilz9Tracking resource usage requires `psutil` to be installedrU   rI   r   g    .ArN   )r   r	   rY   r_   r   rZ   rP   KeyboardInterruptr`   Zpollr   Zmemory_infoZrssZcpu_percent	Exceptionappendr   rG   rL   rR   )r   ra   rX   datamsgZpsZticrA   rB   r]   Zmem2Zcpu2r   r   r   run   s<    



z_Tracker.runN)r4   )r:   r;   r<   r=   r   rU   r`   rg   r   r   r   r   rJ      s
   
rJ   	CacheData)r   r   metricZ
cache_timeZ	free_timec                   sZ   e Zd ZdZdddZ fddZdd Zd	d
 Zdd Zdd Z	dd Z
dd Z  ZS )CacheProfilera  A profiler for dask execution at the scheduler cache level.

    Records the following information for each task:
        1. Key
        2. Task
        3. Size metric
        4. Cache entry time in seconds since the epoch
        5. Cache exit time in seconds since the epoch

    Examples
    --------

    >>> from operator import add, mul
    >>> from dask.threaded import get
    >>> dsk = {'x': 1, 'y': (add, 'x', 10), 'z': (mul, 'y', 2)}
    >>> with CacheProfiler() as prof:
    ...     get(dsk, 'z')
    22

    >>> prof.results    # doctest: +SKIP
    [CacheData('y', (add, 'x', 10), 1, 1435352238.48039, 1435352238.480655),
     CacheData('z', (mul, 'y', 2), 1, 1435352238.480657, 1435352238.480803)]

    The default is to count each task (``metric`` is 1 for all tasks). Other
    functions may used as a metric instead through the ``metric`` keyword. For
    example, the ``nbytes`` function found in ``cachey`` can be used to measure
    the number of bytes in the cache.

    >>> from cachey import nbytes    # doctest: +SKIP
    >>> with CacheProfiler(metric=nbytes) as prof:  # doctest: +SKIP
    ...     get(dsk, 'z')

    The profiling results can be visualized in a bokeh plot using the
    ``visualize`` method. Note that this requires bokeh to be installed.

    >>> prof.visualize() # doctest: +SKIP

    You can activate the profiler globally

    >>> prof.register()  # doctest: +SKIP

    If you use the profiler globally you will need to clear out old results
    manually.

    >>> prof.clear()

    Nc             C   s>   |    |r|ndd | _|r&|| _n|r4|j| _nd| _d S )Nc             S   s   dS )Nr4   r   )r$   r   r   r   <lambda>?  s    z(CacheProfiler.__init__.<locals>.<lambda>count)r   _metric_metric_namer:   )r   ri   Zmetric_namer   r   r   r   =  s    
zCacheProfiler.__init__c                s   |    tt|  S )N)r   r   rj   r   )r   )r   r   r   r   G  s    zCacheProfiler.__enter__c             C   s   | j | | jst | _d S )N)r   r   _start_timer   )r   r   r   r   r   r    K  s    zCacheProfiler._startc       
   	   C   sd   t  }| ||f| j|< xD|d | jD ]0}| j|\}}	| jt||| ||	| q,W d S )NZreleased)r   rm   _cacheintersectionpopr   rd   rh   )
r   r   r$   r   r!   r%   tr+   ri   r"   r   r   r   r'   P  s
    zCacheProfiler._posttaskc          	   C   sL   t  }x6| j D ](\}\}}| jt||| ||| qW | j  d S )N)r   rp   r/   r   rd   rh   r   )r   r   r!   r2   rs   r+   ri   r"   r   r   r   r3   W  s     zCacheProfiler._finishc             K   s&   ddl m} || j| j| j| jf|S )Nr4   )
plot_cache)r6   rt   r   r   ro   rn   )r   r7   rt   r   r   r   r8   ]  s    zCacheProfiler._plotc             K   s   ddl m} || f|S )zVisualize the profiling run in a bokeh plot.

        See also
        --------
        dask.diagnostics.profile_visualize.visualize
        r4   )r9   )r6   r9   )r   r7   r9   r   r   r   r9   b  s    zCacheProfiler.visualizec             C   s   g | _ i | _i | _d| _dS )z#Clear out old results from profilerN)r   rp   r   ro   )r   r   r   r   r   l  s    zCacheProfiler.clear)NN)r:   r;   r<   r=   r   r   r    r'   r3   r8   r9   r   r>   r   r   )r   r   rj     s   /


rj   N)Z
__future__r   r   r   collectionsr   	itertoolsr   Ztimeitr   r@   r   Zmultiprocessingr	   r
   r   Z	callbacksr   Zutilsr   r   r   r?   rC   rJ   rh   rj   r   r   r   r   <module>   s   
U
e9
