ó
šßÈ[c           @   sF  d  Z  d d l Z d d l Z d d l Z d d l Z d d l Z d d l m Z d d l m	 Z	 y d d l
 Z
 Wn e k
 r‘ d d l Z
 n Xd Z e ƒ  Z e a d „  Z d	 „  Z d Z d
 e f d „  ƒ  YZ d e f d „  ƒ  YZ d e f d „  ƒ  YZ d „  Z d „  Z d „  Z d e j f d „  ƒ  YZ e j e ƒ d S(   s
  Implements ProcessPoolExecutor.

The follow diagram and text describe the data-flow through the system::

    |==================== In-process ====================|== Out-of-process ==|

    +----------+     +-------+       +--------+     +----------+    +---------+
    |          |  => | Work  |    => |        |  => | Call Q   | => |         |
    |          |     | Ids   |       |        |     |          |    |         |
    |          |     +-------+       |        |     +----------+    |         |
    |          |     | ...   |       |        |     |...       |    |         |
    |          |     | 6     |       |        |     |5, call() |    |         |
    |          |     | 7     |       |        |     |...       |    |         |
    | Process  |     | ...   |       | Local  |     +----------+    | Process |
    |  Pool    |     +-------+       | Worker |                     |  #1..n  |
    | Executor |                     | Thread |                     |         |
    |          |     +---------+     |        |     +----------+    |         |
    |          | <=> |  Work   | <=> |        | <=  | Result Q | <= |         |
    |          |     |  Items  |     |        |     |          |    |         |
    |          |     +---------+     |        |     +----------+    |         |
    |          |     |6: call()|     |        |     |...       |    |         |
    |          |     |   future|     |        |     |4, result |    |         |
    |          |     |...      |     |        |     |3, except |    |         |
    +----------+     +---------+     +--------+     +----------+    +---------+

Executor.submit() called:
    - creates a uniquely numbered _WorkItem and adds it to the 'Work Items'
      dict
    - adds the id of the _WorkItem to the 'Work Ids' queue

Local worker thread:
    - reads work ids from the 'Work Ids' queue and looks up the corresponding
      WorkItem from the 'Work Items' dict: if the work item has been cancelled
      then it is simply removed from the dict, otherwise it is repackaged as a
      _CallItem and put in the 'Call Q'. New _CallItems are put in the 'Call Q'
      until 'Call Q' is full. NOTE: the size of the 'Call Q' is kept small
      because calls placed in the 'Call Q' can no longer be cancelled with
      Future.cancel().
    - reads _ResultItems from 'Result Q', updates the future stored in the
      'Work Items' dict and deletes the dict entry

Process #1..n:
    - reads _CallItems from 'Call Q', executes the calls, and puts the
      resulting _ResultItems in 'Request Q'
iÿÿÿÿNi   (   t   _basei   (   t   ranges"   Brian Quinlan (brian@sweetapp.com)c          C   s=   t  a x0 t D]( }  |  ƒ  } | d  k	 r | j ƒ  q q Wd  S(   N(   t   Truet	   _shutdownt   _thread_referencest   Nonet   join(   t   thread_referencet   thread(    (    sC   lib/python2.7/site-packages/astropy/utils/compat/futures/process.pyt   _python_exitU   s
    	c          C   s:   x3 t  t ƒ D]% }  |  ƒ  d k r t j |  ƒ q q Wd S(   s  Remove inactive threads from _thread_references.

    Should be called periodically to prevent memory leaks in scenarios such as:
        >>> while True:
        ...    t = ThreadPoolExecutor(max_workers=5)
        ...    t.map(int, ['1', '2', '3', '4', '5'])
    N(   t   setR   R   t   discard(   R   (    (    sC   lib/python2.7/site-packages/astropy/utils/compat/futures/process.pyt   _remove_dead_thread_references^   s    t	   _WorkItemc           B   s   e  Z d  „  Z RS(   c         C   s(   | |  _  | |  _ | |  _ | |  _ d  S(   N(   t   futuret   fnt   argst   kwargs(   t   selfR   R   R   R   (    (    sC   lib/python2.7/site-packages/astropy/utils/compat/futures/process.pyt   __init__s   s    			(   t   __name__t
   __module__R   (    (    (    sC   lib/python2.7/site-packages/astropy/utils/compat/futures/process.pyR   r   s   t   _ResultItemc           B   s   e  Z d d d  „ Z RS(   c         C   s   | |  _  | |  _ | |  _ d  S(   N(   t   work_idt	   exceptiont   result(   R   R   R   R   (    (    sC   lib/python2.7/site-packages/astropy/utils/compat/futures/process.pyR   {   s    		N(   R   R   R   R   (    (    (    sC   lib/python2.7/site-packages/astropy/utils/compat/futures/process.pyR   z   s   t	   _CallItemc           B   s   e  Z d  „  Z RS(   c         C   s(   | |  _  | |  _ | |  _ | |  _ d  S(   N(   R   R   R   R   (   R   R   R   R   R   (    (    sC   lib/python2.7/site-packages/astropy/utils/compat/futures/process.pyR   ‚   s    			(   R   R   R   (    (    (    sC   lib/python2.7/site-packages/astropy/utils/compat/futures/process.pyR      s   c         C   sÌ   xÅ t  rÇ y |  j d t  d d ƒ } Wn$ t j k
 rK | j ƒ  rÄ d Sq Xy | j | j | j Ž  } Wn= t k
 r§ t	 j
 ƒ  d } | j t | j d | ƒƒ q X| j t | j d | ƒƒ q Wd S(   s  Evaluates calls from call_queue and places the results in result_queue.

    This worker is run in a separate process.

    Parameters
    ----------
    call_queue
        A `multiprocessing.Queue` of _CallItems that will be read and
        evaluated by the worker.

    result_queue
        A `multiprocessing.Queue` of _ResultItems that will written
        to by the worker.

    shutdown
        A `multiprocessing.Event` that will be set as a signal to the
        worker that it should exit when call_queue is empty.
    t   blockt   timeoutgš™™™™™¹?Ni   R   R   (   R   t   gett   queuet   Emptyt   is_setR   R   R   t   BaseExceptiont   syst   exc_infot   putR   R   (   t
   call_queuet   result_queuet   shutdownt	   call_itemt   rt   e(    (    sC   lib/python2.7/site-packages/astropy/utils/compat/futures/process.pyt   _process_worker‰   s    	c         C   s    x™ t  r› | j ƒ  r d Sy | j d t ƒ } Wn t j k
 rF d SX|  | } | j j ƒ  rŽ | j t	 | | j
 | j | j ƒ d t  ƒq |  | =q q Wd S(   sX  Fills call_queue with _WorkItems from pending_work_items.

    This function never blocks.

    Parameters
    ----------
    pending_work_items
        A dict mapping work ids to _WorkItems e.g.
        {5: <_WorkItem...>, 6: <_WorkItem...>, ...}

    work_ids
        A queue.Queue of work ids e.g. Queue([5, 6, ...]). Work ids
        are consumed and the corresponding _WorkItems from
        pending_work_items are transformed into _CallItems and put in
        call_queue.

    call_queue
        A multiprocessing.Queue that will be filled with _CallItems
        derived from _WorkItems.
    NR   (   R   t   fullR   t   FalseR   R   R   t   set_running_or_notify_cancelR$   R   R   R   R   (   t   pending_work_itemst   work_idsR%   R   t	   work_item(    (    sC   lib/python2.7/site-packages/astropy/utils/compat/futures/process.pyt   _add_call_item_to_queue®   s     	

c         C   sö   xï t  rñ t | | | ƒ y | j d t  d d ƒ } Wnm t j k
 r¤ |  ƒ  } t sl | d k sl | j rž | sž | j ƒ  x | D] }	 |	 j	 ƒ  qƒ Wd Sn  ~ q X| | j
 }
 | | j
 =| j rÛ |
 j j | j ƒ q |
 j j | j ƒ q Wd S(   sI  Manages the communication between this process and the worker processes.

    This function is run in a local thread.

    Parameters
    ----------
    executor_reference
        A weakref.ref to the ProcessPoolExecutor that owns
        this thread. Used to determine if the ProcessPoolExecutor has been
        garbage collected and that this function can exit.

    process
        A list of the multiprocessing.Process instances used as
        workers.

    pending_work_items
        A dict mapping work ids to _WorkItems e.g.
        {5: <_WorkItem...>, 6: <_WorkItem...>, ...}

    work_ids_queue
        A queue.Queue of work ids e.g. Queue([5, 6, ...]).

    call_queue
        A multiprocessing.Queue that will be filled with _CallItems
        derived from _WorkItems for processing by the process workers.

    result_queue
        A multiprocessing.Queue of _ResultItems generated by the
        process workers.

    shutdown_process_event
        A multiprocessing.Event used to signal the
        process workers that they should exit when their work queue is
        empty.
    R   R   gš™™™™™¹?N(   R   R2   R   R   R   R   R   t   _shutdown_threadR
   R   R   R   R   t   set_exceptiont
   set_resultR   (   t   executor_referencet	   processesR/   t   work_ids_queueR%   R&   t   shutdown_process_eventt   result_itemt   executort   pR1   (    (    sC   lib/python2.7/site-packages/astropy/utils/compat/futures/process.pyt   _queue_manangement_workerÚ   s(    *		

	t   ProcessPoolExecutorc           B   s_   e  Z d d  „ Z d „  Z d „  Z d „  Z e j j j	 e _	 e
 d „ Z e j j j	 e _	 RS(   c         C   s·   t  ƒ  | d k r% t j ƒ  |  _ n	 | |  _ t j |  j t ƒ |  _ t j ƒ  |  _ t	 j ƒ  |  _
 d |  _ t ƒ  |  _ t |  _ t j ƒ  |  _ t j ƒ  |  _ d |  _ i  |  _ d S(   sF  Initializes a new ProcessPoolExecutor instance.

        Parameters
        ----------
        max_workers
            The maximum number of processes that can be used to
            execute the given calls. If None or not given then as many
            worker processes will be created as the machine has processors.
        i    N(   R   R   t   multiprocessingt	   cpu_countt   _max_workerst   Queuet   EXTRA_QUEUED_CALLSt   _call_queuet   _result_queueR   t	   _work_idst   _queue_management_threadR
   t
   _processesR-   R3   t   Eventt   _shutdown_process_eventt	   threadingt   Lockt   _shutdown_lockt   _queue_countt   _pending_work_items(   R   t   max_workers(    (    sC   lib/python2.7/site-packages/astropy/utils/compat/futures/process.pyR   )  s    
				c         C   s“   |  j  d  k r t j d t d t j |  ƒ |  j |  j |  j	 |  j
 |  j |  j f ƒ |  _  t |  j  _ |  j  j ƒ  t j t j |  j  ƒ ƒ n  d  S(   Nt   targetR   (   RG   R   RK   t   ThreadR=   t   weakreft   refRH   RO   RF   RD   RE   RJ   R   t   daemont   startR   t   add(   R   (    (    sC   lib/python2.7/site-packages/astropy/utils/compat/futures/process.pyt   _start_queue_management_threadK  s    	c         C   sn   xg t  t |  j ƒ |  j ƒ D]J } t j d t d |  j |  j |  j	 f ƒ } | j
 ƒ  |  j j | ƒ q Wd  S(   NRQ   R   (   R   t   lenRH   RA   R?   t   ProcessR+   RD   RE   RJ   RV   RW   (   R   t   _R<   (    (    sC   lib/python2.7/site-packages/astropy/utils/compat/futures/process.pyt   _adjust_process_countZ  s    "
c      	   O   s—   |  j  ˆ |  j r" t d ƒ ‚ n  t j ƒ  } t | | | | ƒ } | |  j |  j <|  j j	 |  j ƒ |  j d 7_ |  j
 ƒ  |  j ƒ  | SWd  QXd  S(   Ns*   cannot schedule new futures after shutdowni   (   RM   R3   t   RuntimeErrorR    t   FutureR   RO   RN   RF   R$   RX   R\   (   R   R   R   R   t   ft   w(    (    sC   lib/python2.7/site-packages/astropy/utils/compat/futures/process.pyt   submitc  s    
	

c         C   sl   |  j   t |  _ Wd  QX| r; |  j r; |  j j ƒ  q; n  d  |  _ d  |  _ d  |  _ d  |  _ d  |  _	 d  S(   N(
   RM   R   R3   RG   R   R   RD   RE   RJ   RH   (   R   t   wait(    (    sC   lib/python2.7/site-packages/astropy/utils/compat/futures/process.pyR'   u  s    
					N(   R   R   R   R   RX   R\   Ra   R    t   Executort   __doc__R   R'   (    (    (    sC   lib/python2.7/site-packages/astropy/utils/compat/futures/process.pyR>   (  s   "				(   Rd   t   atexitR?   RK   RS   R"   t    R    t   extern.six.movesR   R   t   ImportErrorRB   t
   __author__R
   R   R-   R   R	   R   RC   t   objectR   R   R   R+   R2   R=   Rc   R>   t   register(    (    (    sC   lib/python2.7/site-packages/astropy/utils/compat/futures/process.pyt   <module>0   s2   					%	,	N]