σ
ίόΪ\c           @` s΅   d  d l  m Z m Z m Z d  d l m Z d  d l Z d  d l Z d  d l Z d  d l	 m
 Z
 d d l m Z d d l m Z m Z m Z e j e  Z d e f d	     YZ d S(
   i    (   t   print_functiont   divisiont   absolute_import(   t   dequeN(   t   geni   (   t   time(   t
   log_errorst   PeriodicCallbackt   parse_timedeltat   Adaptivec        
   B` s§   e  Z d  Z d d d d d d d d d   d 	 Z d   Z d	   Z d
   Z d   Z d   Z	 e
 j d d   Z d   Z d d  Z e
 j d    Z d   Z RS(   sF  
    Adaptively allocate workers based on scheduler load.  A superclass.

    Contains logic to dynamically resize a Dask cluster based on current use.
    This class needs to be paired with a system that can create and destroy
    Dask workers using a cluster resource manager.  Typically it is built into
    already existing solutions, rather than used directly by users.
    It is most commonly used from the ``.adapt(...)`` method of various Dask
    cluster classes.

    Parameters
    ----------
    scheduler: distributed.Scheduler
    cluster: object
        Must have scale_up and scale_down methods/coroutines
    startup_cost : timedelta or str, default "1s"
        Estimate of the number of seconds for nnFactor representing how costly it is to start an additional worker.
        Affects quickly to adapt to high tasks per worker loads
    interval : timedelta or str, default "1000 ms"
        Milliseconds between checks
    wait_count: int, default 3
        Number of consecutive times that a worker should be suggested for
        removal before we remove it.
    scale_factor : int, default 2
        Factor to scale by when it's determined additional workers are needed
    target_duration: timedelta or str, default "5s"
        Amount of time we want a computation to take.
        This affects how aggressively we scale up.
    worker_key: Callable[WorkerState]
        Function to group workers together when scaling down
        See Scheduler.workers_to_close for more information
    minimum: int
        Minimum number of workers to keep around
    maximum: int
        Maximum number of workers to keep around
    **kwargs:
        Extra parameters to pass to Scheduler.workers_to_close

    Examples
    --------

    This is commonly used from existing Dask classes, like KubeCluster

    >>> from dask_kubernetes import KubeCluster
    >>> cluster = KubeCluster()
    >>> cluster.adapt(minimum=10, maximum=100)

    Alternatively you can use it from your own Cluster class by subclassing
    from Dask's Cluster superclass

    >>> from distributed.deploy import Cluster
    >>> class MyCluster(Cluster):
    ...     def scale_up(self, n):
    ...         """ Bring worker count up to n """
    ...     def scale_down(self, workers):
    ...        """ Remove worker addresses from cluster """

    >>> cluster = MyCluster()
    >>> cluster.adapt(minimum=10, maximum=100)

    Notes
    -----
    Subclasses can override :meth:`Adaptive.should_scale_up` and
    :meth:`Adaptive.workers_to_close` to control when the cluster should be
    resized. The default implementation checks if there are too many tasks
    per worker or too little memory available (see :meth:`Adaptive.needs_cpu`
    and :meth:`Adaptive.needs_memory`).

    :meth:`Adaptive.get_scale_up_kwargs` method controls the arguments passed to
    the cluster's ``scale_up`` method.
    t   1si   i    i   t   5sc         C` s   |  S(   N(    (   t   x(    (    s:   lib/python2.7/site-packages/distributed/deploy/adaptive.pyt   <lambda>d   t    c         K` s   t  | d d } |
 |  _ | |  _ | |  _ t  | d d |  _ | |  _ |  j r t |  j | d d | j |  _	 |  j j j
 |  j	 j  n  t |  _ | |  _ | |  _ | |  _ t d d  |  _ i  |  _ | |  _ t  |	  |  _ |  j |  j j d <d  S(   Nt   defaultt   mst   siθ  t   io_loopt   maxlent   adaptive_recommendations(   R   t
   worker_keyt	   schedulert   clustert   startup_costt   scale_factorR   t   _adaptt   loopt   _adapt_callbackt   add_callbackt   startt   Falset	   _adaptingt   _workers_to_close_kwargst   minimumt   maximumR   t   logt   close_countst
   wait_countt   target_durationt   recommendationst   handlers(   t   selfR   R   t   intervalR   R   R"   R#   R&   R'   R   t   kwargs(    (    s:   lib/python2.7/site-packages/distributed/deploy/adaptive.pyt   __init__Y   s&    											c         C` s,   |  j  r( |  j j   d  |  _ |  ` n  d  S(   N(   R   R   t   stopt   None(   R*   (    (    s:   lib/python2.7/site-packages/distributed/deploy/adaptive.pyR.   }   s    		c         C` s¦   |  j  j } |  j  j } | | d |  j d k r’ t j d | |  d } xS |  j  j j   D]< } | t | j	  7} | | k r_ t j d | |  t
 Sq_ Wn  t S(   s  
        Check if the cluster is CPU constrained (too many tasks per core)

        Notes
        -----
        Returns ``True`` if the occupancy per core is some factor larger
        than ``startup_cost`` and the number of tasks exceeds the number of
        cores
        gΦ&θ.>i   s,   CPU limit exceeded [%d occupancy / %d cores]i    s:   pending tasks exceed number of cores [%d tasks / %d cores](   R   t   total_occupancyt   total_ncoresR   t   loggert   infot   workerst   valuest   lent
   processingt   TrueR   (   R*   R0   t   total_corest   tasks_processingt   w(    (    s:   lib/python2.7/site-packages/distributed/deploy/adaptive.pyt	   needs_cpu   s"    
c         C` s   d   |  j  j j   D } g  |  j  j j   D] } | j ^ q/ } t | j    } t |  } | d | k r t j d | |  t St	 Sd S(   sθ   
        Check if the cluster is RAM constrained

        Notes
        -----
        Returns ``True`` if  the required bytes in distributed memory is some
        factor larger than the actual distributed memory available.
        c         S` s"   i  |  ] \ } } | j  |  q S(    (   t   memory_limit(   t   .0t   addrt   ws(    (    s:   lib/python2.7/site-packages/distributed/deploy/adaptive.pys
   <dictcomp>±   s   	 g333333γ?s   Ram limit exceeded [%d/%d]N(
   R   R4   t   itemsR5   t   nbytest   sumR2   R3   R8   R   (   R*   t   limit_bytesR@   t   worker_bytest   limitt   total(    (    s:   lib/python2.7/site-packages/distributed/deploy/adaptive.pyt   needs_memory§   s    
(c         C` sͺ   t     t |  j j  |  j k  r) t S|  j d k	 rW t |  j j  |  j k rW t S|  j j	 rt |  j j rt t S|  j
   } |  j   } | s | r t St SWd QXd S(   sΰ  
        Determine whether additional workers should be added to the cluster

        Returns
        -------
        scale_up : bool

        Notes
        ----
        Additional workers are added whenever

        1. There are unrunnable tasks and no workers
        2. The cluster is CPU constrained
        3. The cluster is RAM constrained
        4. There are fewer workers than our minimum

        See Also
        --------
        needs_cpu
        needs_memory
        N(   R   R6   R   R4   R"   R8   R#   R/   R   t
   unrunnableR<   RH   (   R*   R<   RH   (    (    s:   lib/python2.7/site-packages/distributed/deploy/adaptive.pyt   should_scale_up½   s    
*c         K` sΰ   t  |  j j  |  j k r g  St |  j  } | j |  |  j d k	 r t  |  j j  |  j k r t  |  j j  |  j | d <n  |  j j	 |   } t  |  j j  t  |  |  j k  rά | t  |  j j  |  j  } n  | S(   s  
        Determine which, if any, workers should potentially be removed from
        the cluster.

        Notes
        -----
        ``Adaptive.workers_to_close`` dispatches to Scheduler.workers_to_close(),
        but may be overridden in subclasses.

        Returns
        -------
        List of worker addresses to close, if any

        See Also
        --------
        Scheduler.workers_to_close
        t   nN(
   R6   R   R4   R"   t   dictR!   t   updateR#   R/   t   workers_to_close(   R*   R,   t   kwt   L(    (    s:   lib/python2.7/site-packages/distributed/deploy/adaptive.pyRN   ε   s    * % c         c` sΑ   | d  k r- |  j d |  j d |  j  } n  | sE t j |   n  t   m |  j j d | d t	 d t	  Vt
 j d |  |  j j |  } t j |  r¨ | Vn  t j |   Wd  QXd  S(   Nt   keyR"   R4   t   removet   close_workerss   Retiring workers %s(   R/   RN   R   R"   R   t   ReturnR   R   t   retire_workersR8   R2   R3   R   t
   scale_downt	   is_future(   R*   R4   t   f(    (    s:   lib/python2.7/site-packages/distributed/deploy/adaptive.pyt   _retire_workers  s    !
c         C` s   t  j |  j j |  j  } t d t |  j j  |  j | |  j	  } |  j
 re t |  j
 |  } n  t |  } t j d |  i | d 6S(   sΈ  
        Get the arguments to be passed to ``self.cluster.scale_up``.

        Notes
        -----
        By default the desired number of total workers is returned (``n``).
        Subclasses should ensure that the return dictionary includes a key-
        value pair for ``n``, either by implementing it or by calling the
        parent's ``get_scale_up_kwargs``.

        See Also
        --------
        LocalCluster.scale_up
        i   s   Scaling up to %d workersRK   (   t   matht   ceilR   R0   R'   t   maxR6   R4   R   R"   R#   t   mint   intR2   R3   (   R*   t   targett	   instances(    (    s:   lib/python2.7/site-packages/distributed/deploy/adaptive.pyt   get_scale_up_kwargs  s    (	c         C` sf  |  j    } t |  j d |  j d |  j   } | rh | rh t j d  |  j j   i d d 6d d 6S| r |  j j   t	 j
 i d d 6|  j    S| rQi  } g  } xU |  j j   D]D \ } } | | k rΊ | |  j k rρ | j |  qώ | | | <qΊ qΊ Wx( | D]  } | j | d	  d
 | | <q	W| |  _ | rbi d d 6| d 6Sn |  j j   d  Sd  S(   NRQ   R"   s5   Attempting to scale up and scale down simultaneously.t   errort   statuss*   Trying to scale up and down simultaneouslyt   msgt   upi    i   t   downR4   (   RJ   t   setRN   R   R"   R2   R3   R%   t   cleart   toolzt   mergeRa   RA   R&   t   appendt   getR/   (   R*   t   commRJ   R4   t   dt   to_closeR;   t   c(    (    s:   lib/python2.7/site-packages/distributed/deploy/adaptive.pyR(   3  s4    $	c         c` sκ   |  j  r d  St |  _  zΓ |  j   } | s/ d  S| j d  } | d k r |  j j |   } |  j j t   d | f  t	 j
 |  rΨ | VqΨ nF | d k rΨ |  j j t   d | d f  |  j d | d  V} n  Wd  t |  _  Xd  S(   NRc   Re   Rf   R4   (   R    R8   R(   t   popR   t   scale_upR$   Rk   R   R   RW   RY   R   (   R*   R(   Rc   RX   R4   (    (    s:   lib/python2.7/site-packages/distributed/deploy/adaptive.pyR   W  s"    		 c         C` s   |  j  j j |  j  d  S(   N(   R   R   R   R   (   R*   (    (    s:   lib/python2.7/site-packages/distributed/deploy/adaptive.pyt   adaptn  s    N(   t   __name__t
   __module__t   __doc__R/   R-   R.   R<   RH   RJ   RN   R   t	   coroutineRY   Ra   R(   R   Rs   (    (    (    s:   lib/python2.7/site-packages/distributed/deploy/adaptive.pyR	      s*   G		$		(	!	$(   t
   __future__R    R   R   t   collectionsR   t   loggingRZ   Ri   t   tornadoR   t   metricsR   t   utilsR   R   R   t	   getLoggerRt   R2   t   objectR	   (    (    (    s:   lib/python2.7/site-packages/distributed/deploy/adaptive.pyt   <module>   s   