B
    Ô4\Ì  ã               @   s¦   d Z ddlmZmZmZ ddlmZ ddlm	Z	 ddl
Z
ddlZddlZddlZddlmZ e e¡Ze ¡ Zdd	„ ZG d
d„ dejƒZddd„Zdd„ Ze ¡ ZdS )aÌ  
Modified ThreadPoolExecutor to support threads leaving the thread pool

This includes a global `secede` method that a submitted function can call to
have its thread leave the ThreadPoolExecutor's thread pool.  This allows the
thread pool to allocate another thread if necessary and so is useful when a
function realises that it is going to be a long-running job that doesn't want
to take up space.  When the function finishes its thread will terminate
gracefully.

This code copies and modifies two functions from the
`concurrent.futures.thread` module, notably `_worker` and
ThreadPoolExecutor._adjust_thread_count` to allow for checking against a global
`threading.local` state.  These functions are subject to the following license,
which is included as a comment at the end of this file:

    https://docs.python.org/3/license.html

... and are under copyright by the Python Software Foundation

   Copyright 2001-2016 Python Software Foundation; All Rights Reserved
é    )Úprint_functionÚdivisionÚabsolute_importé   )Ú_concurrent_futures_thread)ÚEmptyN)Útimec          	   C   s   dt _| t _zäy¼x´t jrÄ| j@ | jrZ| j ¡ \}}| j |¡ | j t	 
¡ ¡ | ¡  P W d Q R X y|jdd}W n tk
rŠ   wY nX |d k	r | ¡  ~qtjs´| d ks´| jr| d ¡ d S qW ~ W n" tk
rì   tjddd Y nX W d t `t `X d S )NTr   )ÚtimeoutzException in worker)Úexc_info)Úthread_stateÚproceedÚexecutorÚ_rejoin_lockÚ_rejoin_listÚpopÚ_threadsÚaddÚremoveÚ	threadingÚcurrent_threadÚsetÚgetr   ÚrunÚthreadÚ	_shutdownÚputÚBaseExceptionÚloggerZcritical)r   Z
work_queueZrejoin_threadZrejoin_eventZtask© r   ú=lib/python3.7/site-packages/distributed/threadpoolexecutor.pyÚ_worker'   s4    
r    c                   s6   e Zd Ze ¡ Z‡ fdd„Zdd„ Zd	dd„Z‡  Z	S )
ÚThreadPoolExecutorc                s4   t t| ƒj||Ž g | _t ¡ | _| dd¡| _d S )NZthread_name_prefixZDaskThreadPoolExecutor)	Úsuperr!   Ú__init__r   r   ÚLockr   r   Ú_thread_name_prefix)ÚselfÚargsÚkwargs)Ú	__class__r   r   r#   J   s    
zThreadPoolExecutor.__init__c             C   s\   t | jƒ| jk rXtjt| jdt ¡ t	| j
ƒf  | | jfd}d|_| j |¡ | ¡  d S )Nz-%d-%d)ÚtargetÚnamer'   T)Úlenr   Z_max_workersr   ZThreadr    r%   ÚosÚgetpidÚnextÚ_counterÚ_work_queueZdaemonr   Ústart)r&   Útr   r   r   Ú_adjust_thread_countP   s    z'ThreadPoolExecutor._adjust_thread_countTNc          
   C   sˆ   t z | j d| _| j d ¡ W d Q R X |d k	r<tƒ | }|rzx8| jD ].}|d k	rft|tƒ  dƒ}nd }|j|d qHW W d Q R X d S )NTr   )r	   )	Úthreads_lockZ_shutdown_lockr   r1   r   r   r   ÚmaxÚjoin)r&   Úwaitr	   Zdeadliner3   Ztimeout2r   r   r   ÚshutdownY   s    
zThreadPoolExecutor.shutdown)TN)
Ú__name__Ú
__module__Ú__qualname__Ú	itertoolsÚcountr0   r#   r4   r9   Ú__classcell__r   r   )r)   r   r!   F   s   	r!   Tc          	   C   s:   dt _t& t jj t ¡ ¡ | r,t j ¡  W dQ R X dS )zw Have this thread secede from the ThreadPoolExecutor

    See Also
    --------
    rejoin: rejoin the thread pool
    FN)	r   r   r5   r   r   r   r   r   r4   )Zadjustr   r   r   Úsecedei   s
    r@   c           	   C   sX   t  ¡ } t  ¡ }tj}|j |j | |f¡ W dQ R X | dd„ ¡ | 	¡  dt_
dS )a   Have this thread rejoin the ThreadPoolExecutor

    This will block until a new slot opens up in the executor.  The next thread
    to finish a task will leave the pool to allow this one to join.

    See Also
    --------
    secede: leave the thread pool
    Nc               S   s   d S )Nr   r   r   r   r   Ú<lambda>†   s    zrejoin.<locals>.<lambda>T)r   r   ZEventr   r   r   r   ÚappendZsubmitr8   r   )r   ZeventÚer   r   r   Úrejoinw   s    
rD   )T)Ú__doc__Z
__future__r   r   r   Ú r   r   Zcompatibilityr   r-   Zloggingr   r=   Zmetricsr   Z	getLoggerr:   r   Zlocalr   r    r!   r@   rD   r$   r5   r   r   r   r   Ú<module>   s    
#
/