B
    ^\5                 @   s  d dl mZmZmZ d dlZd dlmZ d dlZd dlZd dl	Z	d dl
Z
d dlZd dlmZ d dlmZ ddlmZ dd	lmZ dd
lmZ ddlmZmZmZmZmZmZmZmZ ddlm Z  ddl!m"Z" ddl#m$Z$m%Z% e&e'Z(G dd deZ)dd Z*e
+ Z,ej-dd Z.dS )    )print_functiondivisionabsolute_importN)	timedelta)factors)gen   )Cluster   )get_thread_identity)CommClosedError)syncignoringAllsilence_logging
LoopRunner
log_errorsthread_stateparse_timedelta)Nanny)	Scheduler)Worker_ncoresc               @   s  e Zd ZdZdddddddejddddddfddZd	d
 Zdd Ze	dd Z
dd Zdd Zejd4ddZejd5ddZdd Zejdd Zdd Zejd6dd Zd7d"d#Zejd$d% Zejd&d' Zd(d) Zd*d+ Zd,d- Zejd.d/ Zejd0d1 Ze	d2d3 ZdS )8LocalClustera   Create local Scheduler and Workers

    This creates a "cluster" of a scheduler and workers running on the local
    machine.

    Parameters
    ----------
    n_workers: int
        Number of workers to start
    processes: bool
        Whether to use processes (True) or threads (False).  Defaults to True
    threads_per_worker: int
        Number of threads per each worker
    scheduler_port: int
        Port of the scheduler.  8786 by default, use 0 to choose a random port
    silence_logs: logging level
        Level of logs to print out to stdout.  ``logging.WARN`` by default.
        Use a falsey value like False or None for no change.
    ip: string
        IP address on which the scheduler will listen, defaults to only localhost
    diagnostics_port: int
        Port on which the :doc:`web` will be provided.  8787 by default, use 0
        to choose a random port, ``None`` to disable it, or an
        :samp:`({ip}:{port})` tuple to listen on a different IP address than
        the scheduler.
    asynchronous: bool (False by default)
        Set to True if using this cluster within async/await functions or within
        Tornado gen.coroutines.  This should remain False for normal use.
    kwargs: dict
        Extra worker arguments, will be passed to the Worker constructor.
    service_kwargs: Dict[str, Dict]
        Extra keywords to hand to the running services
    security : Security

    Examples
    --------
    >>> c = LocalCluster()  # Create a local cluster with as many workers as cores  # doctest: +SKIP
    >>> c  # doctest: +SKIP
    LocalCluster("127.0.0.1:8786", workers=8, ncores=8)

    >>> c = Client(c)  # connect to local cluster  # doctest: +SKIP

    Add a new worker to the cluster

    >>> w = c.start_worker(ncores=2)  # doctest: +SKIP

    Shut down the extra worker

    >>> c.stop_worker(w)  # doctest: +SKIP

    Pass extra keyword arguments to Bokeh

    >>> LocalCluster(service_kwargs={'bokeh': {'prefix': '/foo'}})  # doctest: +SKIP
    NTr   iS"  Fc             K   s  |d k	rd}t |d | _|| _|| _|| _|| _|
p8i }
|p@i }|rRt|d| _|d kr||d kr||rttt	\}}nd}t	}|d kr|d k	rt
dt	| }|r|d krt
dttt	| }|||d t||d| _| jj| _|	dk	r^|	d k	r^yddlm} dd	lm} W n  tk
r6   td
 Y n(X ||pBi di f|
d|	f< ||d< t| j|
|d| _|| _g | _|| _|r|| jd< | j||d t !|  d S )NzThe start= parameter is deprecated. LocalCluster always starts. For asynchronous operation use the following: 

  cluster = yield LocalCluster(asynchronous=True))levelr   )ncoresservices)loopasynchronousFr   )BokehScheduler)BokehWorkerz4To start diagnostics web server please install Bokehbokeh)r!   r   )r   r   securityr"   )ip	n_workers)"
ValueErrorstatus	processessilence_logs_asynchronousr"   r   _old_logging_levelnprocesses_nthreadsr   maxintmathZceilupdater   _loop_runnerr   Zdistributed.bokeh.schedulerr   Zdistributed.bokeh.workerr    ImportErrorloggerdebuggetr   	schedulerscheduler_portworkersworker_kwargsstartclusters_to_closeadd)selfr$   Zthreads_per_workerr'   r   r9   r#   r6   r(   Zdiagnostics_portr   Zworker_servicesZservice_kwargsr   r"   r8   msgr   r     r>   7lib/python3.7/site-packages/distributed/deploy/local.py__init__Q   sX    



zLocalCluster.__init__c             C   s&   d| j t| jtdd | jD f S )Nz'LocalCluster(%r, workers=%d, ncores=%d)c             s   s   | ]}|j V  qd S )N)r   ).0wr>   r>   r?   	<genexpr>   s    z(LocalCluster.__repr__.<locals>.<genexpr>)scheduler_addresslenr7   sum)r<   r>   r>   r?   __repr__   s    zLocalCluster.__repr__c             C   s
   | j  S )N)_started	__await__)r<   r>   r>   r?   rI      s    zLocalCluster.__await__c             C   s,   | j p*ttddp*t| jdo*| jjt kS )Nr   F_thread_identity)r)   getattrr   hasattrr   rJ   r   )r<   r>   r>   r?   r      s    zLocalCluster.asynchronousc             O   s^   | dd s| jrF| dd }|||}|d k	rBtt|d|}|S t| j|f||S d S )Nr   callback_timeout)seconds)popr   r   with_timeoutr   r   r   )r<   funcargskwargsrM   Zfuturer>   r>   r?   r      s    
zLocalCluster.syncc             K   s4   | j   | jr | jf || _n| j| jf| d S )N)r0   r9   r)   _startrH   r   )r<   rS   r>   r>   r?   r9      s    
zLocalCluster.startc             #   s    j dkrdS |dkr( js( js(d}n8|dk	rJ|drJd| jf }n|dkrVd}| jf} j|  fddt|D V  d _ t dS )	z-
        Start all cluster services.
        runningNz	inproc://ztls://z%s:%dz	127.0.0.1c                s   g | ]} j f  jqS r>   )_start_workerr8   )rA   i)r<   r>   r?   
<listcomp>   s    z'LocalCluster._start.<locals>.<listcomp>)	r&   r6   r'   
startswithr5   r9   ranger   Return)r<   r#   r$   rD   r>   )r<   r?   rT      s    

zLocalCluster._start<   c             k   s   | j r&| j dr&td| j   d S | jr:t}d|d< nt}|| jjf| j	|| j
d|}| V  | j| x(|j dkr|j| jjkrtdV  qvW |j dkr| jj dkr| j| td	t|d S )
NZclosz*Tried to start a worker while status=='%s'Tquiet)r   death_timeoutr(   closedg{Gz?rU   zWorker failed to start)r&   rY   warningswarnr'   r   r   r5   addressr   r(   rT   r7   appendworker_addressr   ZsleepremoveTimeoutErrorr[   )r<   r^   rS   WrB   r>   r>   r?   rV      s$    


zLocalCluster._start_workerc             K   s   | j | jf|S )a   Add a new worker to the running cluster

        Parameters
        ----------
        port: int (optional)
            Port on which to serve the worker, defaults to 0 or random
        ncores: int (optional)
            Number of threads to use.  Defaults to number of logical cores

        Examples
        --------
        >>> c = LocalCluster()  # doctest: +SKIP
        >>> c.start_worker(ncores=2)  # doctest: +SKIP

        Returns
        -------
        The created Worker or Nanny object.  Can be discarded.
        )r   rV   )r<   rS   r>   r>   r?   start_worker   s    zLocalCluster.start_workerc             c   s$   |  V  || jkr | j| d S )N)_closer7   re   )r<   rB   r>   r>   r?   _stop_worker   s    

zLocalCluster._stop_workerc             C   s   |  | j| dS )z Stop a running worker

        Examples
        --------
        >>> c = LocalCluster()  # doctest: +SKIP
        >>> w = c.start_worker(ncores=2)  # doctest: +SKIP
        >>> c.stop_worker(w)  # doctest: +SKIP
        N)r   rj   )r<   rB   r>   r>   r?   stop_worker  s    	zLocalCluster.stop_worker2sc          	   #   s    j dkrd S d _  j  ttj2 ttt|dt	 fdd j
D V  W d Q R X  j
d d = z:ttjtt  jjddV  W d Q R X  j
d d = W d d _ X d S )Nr_   closing)rN   c                s   g | ]}  |qS r>   )rj   )rA   rB   )r<   r>   r?   rX     s    z'LocalCluster._close.<locals>.<listcomp>T)Zfast)r&   r5   Zclear_task_stater   r   rf   rP   r   r   r   r7   r   OSErrorclose)r<   timeoutr>   )r<   r?   ri     s    

&zLocalCluster._close   c                s~    j dkrdS y j j|d}W n tk
r:   d}Y nX t drj jr`| fdd n
t j  jsz j	
  |S )z Close the cluster r_   N)rM   r*   c                s
   t  jS )N)r   r*   )_)r<   r>   r?   <lambda>2  s    z$LocalCluster.close.<locals>.<lambda>)r&   r   ri   RuntimeErrorrL   r   Zadd_done_callbackr   r*   r0   stop)r<   rp   resultr>   )r<   r?   ro   &  s    




zLocalCluster.closec          	   +   s\   t  L tj|  fddt|tjj D V  dd jD _W dQ R X dS )z Bring the total count of workers up to ``n``

        This function/coroutine should bring the total number of workers up to
        the number ``n``.

        This can be implemented either as a function or as a Tornado coroutine.
        c                s   g | ]}j f  qS r>   )rV   )rA   rW   )kwargs2r<   r>   r?   rX   F  s   z)LocalCluster.scale_up.<locals>.<listcomp>c             S   s   g | ]}|j d kr|qS )r_   )r&   )rA   rB   r>   r>   r?   rX   J  s    N)r   toolzmerger8   rZ   rE   r5   r7   )r<   nrS   r>   )rw   r<   r?   scale_up;  s
    	zLocalCluster.scale_upc          	   #   sj   t  Z dd  jD  _ttdd D rHfdd jD  fddD V  W dQ R X dS )	aB   Remove ``workers`` from the cluster

        Given a list of worker addresses this function should remove those
        workers from the cluster.  This may require tracking which jobs are
        associated to which worker address.

        This can be implemented either as a function or as a Tornado coroutine.
        c             S   s   g | ]}|j d kr|qS )r_   )r&   )rA   rB   r>   r>   r?   rX   X  s    z+LocalCluster.scale_down.<locals>.<listcomp>c             s   s   | ]}t |tV  qd S )N)
isinstancestr)rA   rB   r>   r>   r?   rC   \  s    z*LocalCluster.scale_down.<locals>.<genexpr>c                s   h | ]}|j  kr|qS r>   )rd   )rA   rB   )r7   r>   r?   	<setcomp>]  s    z*LocalCluster.scale_down.<locals>.<setcomp>c                s   g | ]}  |qS r>   )rj   )rA   rB   )r<   r>   r?   rX   `  s    N)r   r7   setall)r<   r7   r>   )r<   r7   r?   
scale_downL  s    
zLocalCluster.scale_downc             C   s   |    d S )N)ro   )r<   r>   r>   r?   __del__b  s    zLocalCluster.__del__c             C   s   | S )Nr>   )r<   r>   r>   r?   	__enter__e  s    zLocalCluster.__enter__c             G   s   |    d S )N)ro   )r<   rR   r>   r>   r?   __exit__h  s    zLocalCluster.__exit__c             c   s   | j V  t| d S )N)rH   r   r[   )r<   r>   r>   r?   
__aenter__k  s    zLocalCluster.__aenter__c             c   s   |   V  d S )N)ri   )r<   typvalue	tracebackr>   r>   r?   	__aexit__p  s    zLocalCluster.__aexit__c             C   s"   y| j jS  tk
r   dS X d S )Nz<unstarted>)r5   rb   r%   )r<   r>   r>   r?   rD   t  s    zLocalCluster.scheduler_address)Nr   )r\   )rl   )rq   )__name__
__module____qualname____doc__loggingZWARNr@   rG   rI   propertyr   r   r9   r   	coroutinerT   rV   rh   rj   rk   ri   ro   r{   r   r   r   r   r   r   rD   r>   r>   r>   r?   r      s:   6=
r   c                s8    dkr }nt  fddt D } | }||fS )aB  
    The default breakdown of processes and threads for a given number of cores

    Parameters
    ----------
    n: int
        Number of available cores

    Examples
    --------
    >>> nprocesses_nthreads(4)
    (4, 1)
    >>> nprocesses_nthreads(32)
    (8, 4)

    Returns
    -------
    nprocesses, nthreads
       c             3   s    | ]}|t  kr|V  qd S )N)r.   Zsqrt)rA   f)rz   r>   r?   rC     s    z&nprocesses_nthreads.<locals>.<genexpr>)minr   )rz   r'   Zthreadsr>   )rz   r?   r+   |  s
    r+   c              C   s"   xt tD ]} | jdd q
W d S )N
   )rp   )listr:   ro   )clusterr>   r>   r?   close_clusters  s    r   )/Z
__future__r   r   r   atexitZdatetimer   r   r.   r`   weakrefrx   Z
dask.utilsr   Ztornador   r   r	   Zcompatibilityr   Zcorer   Zutilsr   r   r   r   r   r   r   r   Znannyr   r5   r   Zworkerr   r   Z	getLoggerr   r2   r   r+   WeakSetr:   registerr   r>   r>   r>   r?   <module>   s.   (
  d