ó
ßüÚ\c           @` sä   d  Z  d d l m Z m Z m Z d d l m Z d d l m	 Z	 d d l
 Z
 d d l Z d d l Z d d l Z d d l m Z e j e ƒ Z e j ƒ  Z d „  Z d	 e j f d
 „  ƒ  YZ e d „ Z d „  Z e j ƒ  Z d S(   sÌ  
Modified ThreadPoolExecutor to support threads leaving the thread pool

This includes a global `secede` method that a submitted function can call to
have its thread leave the ThreadPoolExecutor's thread pool.  This allows the
thread pool to allocate another thread if necessary and so is useful when a
function realises that it is going to be a long-running job that doesn't want
to take up space.  When the function finishes its thread will terminate
gracefully.

This code copies and modifies two functions from the
`concurrent.futures.thread` module, notably `_worker` and
ThreadPoolExecutor._adjust_thread_count` to allow for checking against a global
`threading.local` state.  These functions are subject to the following license,
which is included as a comment at the end of this file:

    https://docs.python.org/3/license.html

... and are under copyright by the Python Software Foundation

   Copyright 2001-2016 Python Software Foundation; All Rights Reserved
i    (   t   print_functiont   divisiont   absolute_importi   (   t   _concurrent_futures_thread(   t   EmptyN(   t   timec         C` sB  t  t _ |  t _ zyñ xç t j r|  j W |  j r€ |  j j ƒ  \ } } |  j j | ƒ |  j j	 t
 j ƒ  ƒ | j ƒ  Pn  Wd  QXy | j d d ƒ } Wn t k
 r² q n X| d  k	 rÏ | j ƒ  ~ q t j sí |  d  k sí |  j r | j d  ƒ d  Sq W~  Wn$ t k
 r,t j d d t  ƒn XWd  t ` t ` Xd  S(   Nt   timeouti   s   Exception in workert   exc_info(   t   Truet   thread_statet   proceedt   executort   _rejoin_lockt   _rejoin_listt   popt   _threadst   addt   removet	   threadingt   current_threadt   sett   getR   t   Nonet   runt   threadt	   _shutdownt   putt   BaseExceptiont   loggert   critical(   R   t
   work_queuet   rejoin_threadt   rejoin_eventt   task(    (    s=   lib/python2.7/site-packages/distributed/threadpoolexecutor.pyt   _worker'   s6    		 
	


t   ThreadPoolExecutorc           B` s5   e  Z e j ƒ  Z d  „  Z d „  Z e d d „ Z	 RS(   c         O` sJ   t  t |  ƒ j | | Ž  g  |  _ t j ƒ  |  _ | j d d ƒ |  _ d  S(   Nt   thread_name_prefixt   DaskThreadPoolExecutor(	   t   superR#   t   __init__R   R   t   LockR   R   t   _thread_name_prefix(   t   selft   argst   kwargs(    (    s=   lib/python2.7/site-packages/distributed/threadpoolexecutor.pyR'   J   s
    	c      	   C` s‰   t  |  j ƒ |  j k  r… t j d t d |  j d t j ƒ  t	 |  j
 ƒ f d |  |  j f ƒ } t | _ |  j j | ƒ | j ƒ  n  d  S(   Nt   targett   names   -%d-%dR+   (   t   lenR   t   _max_workersR   t   ThreadR"   R)   t   ost   getpidt   nextt   _countert   _work_queueR   t   daemonR   t   start(   R*   t   t(    (    s=   lib/python2.7/site-packages/distributed/threadpoolexecutor.pyt   _adjust_thread_countR   s    	 	c      
   C` s®   t  ¢ |  j  t |  _ |  j j d  ƒ Wd  QX| d  k	 rL t ƒ  | } n  | r¤ xO |  j D]A } | d  k	 r‡ t	 | t ƒ  d ƒ } n d  } | j
 d | ƒ q\ Wn  Wd  QXd  S(   Ni    R   (   t   threads_lockt   _shutdown_lockR   R   R6   R   R   R   R   t   maxt   join(   R*   t   waitR   t   deadlineR9   t   timeout2(    (    s=   lib/python2.7/site-packages/distributed/threadpoolexecutor.pyt   shutdown^   s    
	N(
   t   __name__t
   __module__t	   itertoolst   countR5   R'   R:   R   R   RB   (    (    (    s=   lib/python2.7/site-packages/distributed/threadpoolexecutor.pyR#   F   s   		c         C` sI   t  t _ t 4 t j j j t j ƒ  ƒ |  r? t j j	 ƒ  n  Wd QXd S(   sw    Have this thread secede from the ThreadPoolExecutor

    See Also
    --------
    rejoin: rejoin the thread pool
    N(
   t   FalseR	   R
   R;   R   R   R   R   R   R:   (   t   adjust(    (    s=   lib/python2.7/site-packages/distributed/threadpoolexecutor.pyt   seceden   s
    	c          C` sn   t  j ƒ  }  t  j ƒ  } t j } | j  | j j |  | f ƒ Wd QX| j d „  ƒ | j	 ƒ  t
 t _ d S(   s   Have this thread rejoin the ThreadPoolExecutor

    This will block until a new slot opens up in the executor.  The next thread
    to finish a task will leave the pool to allow this one to join.

    See Also
    --------
    secede: leave the thread pool
    Nc           S` s   d  S(   N(   R   (    (    (    s=   lib/python2.7/site-packages/distributed/threadpoolexecutor.pyt   <lambda>‹   t    (   R   R   t   EventR	   R   R   R   t   appendt   submitR?   R   R
   (   R   t   eventt   e(    (    s=   lib/python2.7/site-packages/distributed/threadpoolexecutor.pyt   rejoin|   s    
	

(   t   __doc__t
   __future__R    R   R   RK   R   R   t   compatibilityR   R2   t   loggingR   RE   t   metricsR   t	   getLoggerRC   R   t   localR	   R"   R#   R   RI   RQ   R(   R;   (    (    (    s=   lib/python2.7/site-packages/distributed/threadpoolexecutor.pyt   <module>   s    	(	/