B
    ;T\                 @   sf   d dl Z d dlZd dlmZ d dlZddlmZ ddlmZm	Z	m
Z
mZ e eZG dd deZdS )	    N)ref   )Adaptive   )format_bytesPeriodicCallback
log_errorsignoringc               @   sP   e Zd ZdZdd Zedd Zedd Zdd	 Zd
d Z	dd Z
dd ZdS )Clustera   Superclass for cluster objects

    This expects a local Scheduler defined on the object.  It provides
    common methods and an IPython widget display.

    Clusters inheriting from this class should provide the following:

    1.  A local ``Scheduler`` object at ``.scheduler``
    2.  scale_up and scale_down methods as defined below::

        def scale_up(self, n: int):
            ''' Brings total worker count up to ``n`` '''

        def scale_down(self, workers: List[str]):
            ''' Close the workers with the given addresses '''

    This will provide a general ``scale`` method as well as an IPython widget
    for display.

    Examples
    --------

    >>> from distributed.deploy import Cluster
    >>> class MyCluster(cluster):
    ...     def scale_up(self, n):
    ...         ''' Bring the total worker count up to n '''
    ...         pass
    ...     def scale_down(self, workers):
    ...         ''' Close the workers with the given addresses '''
    ...         pass

    >>> cluster = MyCluster()
    >>> cluster.scale(5)                       # scale manually
    >>> cluster.adapt(minimum=1, maximum=100)  # scale automatically

    See Also
    --------
    LocalCluster: a simple implementation with local workers
    c          	   K   sT   t t | j  W dQ R X t| ds.i | _| j| t| j| f| j| _| jS )z Turn on adaptivity

        For keyword arguments see dask.distributed.Adaptive

        Examples
        --------
        >>> cluster.adapt(minimum=0, maximum=10, interval='500ms')
        N_adaptive_options)	r	   AttributeError	_adaptivestophasattrr   updater   	scheduler)selfkwargs r   9lib/python3.7/site-packages/distributed/deploy/cluster.pyadapt6   s    	

zCluster.adaptc             C   s   | j jS )N)r   address)r   r   r   r   scheduler_addressG   s    zCluster.scheduler_addressc             C   sN   t jd}| jjdd dd }| jjd j}|jf ||dt	j
S )Nzdistributed.dashboard.linkz://r   :r   bokeh)hostport)daskZconfiggetr   r   splitservicesr   formatosenviron)r   templater   r   r   r   r   dashboard_linkK   s    zCluster.dashboard_linkc          	   C   s   t  x |t| jjkr,| jj| j| nN| jjt| jj| d}t	d| | jjj| jj
|d | jj| j| W dQ R X dS )a5   Scale cluster to n workers

        Parameters
        ----------
        n: int
            Target number of workers

        Example
        -------
        >>> cluster.scale(10)  # scale cluster to ten workers

        See Also
        --------
        Cluster.scale_up
        Cluster.scale_down
        )nzClosing workers: %s)workersN)r   lenr   r'   loopZadd_callbackZscale_upZworkers_to_closeloggerdebugZretire_workersZ
scale_down)r   r&   Zto_closer   r   r   scaleR   s    zCluster.scalec             C   sZ   t | jj}tdd | jj D }tdd | jj D }t|}d|||f }|S )Nc             s   s   | ]}|j V  qd S )N)Zncores).0wsr   r   r   	<genexpr>o   s    z)Cluster._widget_status.<locals>.<genexpr>c             s   s   | ]}|j V  qd S )N)Zmemory_limit)r-   r.   r   r   r   r/   p   s    a  
<div>
  <style scoped>
    .dataframe tbody tr th:only-of-type {
        vertical-align: middle;
    }

    .dataframe tbody tr th {
        vertical-align: top;
    }

    .dataframe thead th {
        text-align: right;
    }
  </style>
  <table style="text-align: right;">
    <tr><th>Workers</th> <td>%d</td></tr>
    <tr><th>Cores</th> <td>%d</td></tr>
    <tr><th>Memory</th> <td>%s</td></tr>
  </table>
</div>
)r(   r   r'   sumvaluesr   )r   r'   ZcoresZmemorytextr   r   r   _widget_statusm   s    zCluster._widget_statusc                s  yj S  tk
r   Y nX ddlm}m}m}m}m}m}m	} |dd}dj
jkrjj}	d|	|	f }	nd}	dtj }
||
}
||	}| |dd	d
|dd|d|d|d}|dd|d|dd|d |d|d}|||g| |gg|dd	d
}d|_|dd |dd ||
||g|g}|_  fdd}|| fdd}|| tj
}fdd}t|dj
jd}|j
jd< |  |S )z5 Create IPython widget for display within a notebook r   )LayoutVBoxHBoxIntTextButtonHTML	AccordionZ150px)widthr   z=<p><b>Dashboard: </b><a href="%s" target="_blank">%s</a></p>
 z<h2>%s</h2>)Z	min_width)layoutZWorkers)Zdescriptionr=   ZScaleZMinimumZMaximumZAdaptZ500pxNzManual Scalingr   zAdaptive Scalingc                s   j j jd d S )N)minimummaximum)r   value)b)r?   r>   r   r   r   adapt_cb   s    z!Cluster._widget.<locals>.adapt_cbc          
      sD   t  4  j}tt j  W d Q R X | W d Q R X d S )N)r   r@   r	   r   r   r   r,   )rA   r&   )requestr   r   r   scale_cb   s
    
z!Cluster._widget.<locals>.scale_cbc                  s      _d S )N)r3   r@   r   )r   statusr   r   r      s    zCluster._widget.<locals>.updatei  )Zio_loopzcluster-repr)Z_cached_widgetr   Z
ipywidgetsr4   r5   r6   r7   r8   r9   r:   r   r    r%   type__name__r3   Zselected_indexZ	set_titleZon_clickr   r   r)   Zperiodic_callbacksstart)r   r4   r5   r6   r7   r8   r9   r:   r=   linktitleZ	dashboardr,   r   Z	accordionZboxrB   rD   Zscheduler_refr   Zpcr   )r?   r>   rC   r   rE   r   _widget   sP    $



zCluster._widgetc             K   s   |   jf |S )N)rK   _ipython_display_)r   r   r   r   r   rL      s    zCluster._ipython_display_N)rG   
__module____qualname____doc__r   propertyr   r%   r,   r3   rK   rL   r   r   r   r   r
      s   'Er
   )Zloggingr"   weakrefr   r   Zadaptiver   Zutilsr   r   r   r	   Z	getLoggerrG   r*   objectr
   r   r   r   r   <module>   s   
