ó
ßüÚ\c           @` s9  d  d l  m Z m Z m Z d  d l Z d  d l m Z d  d l Z d  d l Z d  d l	 Z	 d  d l
 Z
 d  d l Z d d l m Z m Z d d l m Z d  d l m Z d  d l m Z d  d	 l m Z e j e ƒ Z d
 „  Z d „  Z d e f d „  ƒ  YZ d e f d „  ƒ  YZ e j  ƒ  Z! e j" d „  ƒ Z# d S(   i    (   t   print_functiont   divisiont   absolute_importN(   t	   timedeltai   (   t   finalizet   Queue(   t
   mp_context(   t   gen(   t   Future(   t   IOLoopc         G` sL   y |  j  | | Œ Wn1 t k
 rG } t j d t | ƒ ƒ sH ‚  qH n Xd S(   sQ   
    Helper to silence "IOLoop is closing" exception on IOLoop.add_callback.
    s   IOLoop is clos(ed|ing)N(   t   add_callbackt   RuntimeErrort   ret   searcht   str(   t   loopt   funct   argst   exc(    (    s2   lib/python2.7/site-packages/distributed/process.pyt   _loop_add_callback   s
    c         O` sS   y | | | Ž  } Wn& t  k
 r; } t |  | j | ƒ n Xt |  | j | ƒ d  S(   N(   t	   ExceptionR   t   set_exceptiont
   set_result(   R   t   futureR   R   t   kwargst   resR   (    (    s2   lib/python2.7/site-packages/distributed/process.pyt   _call_and_set_future!   s
    t   _ProcessStatec           B` s   e  Z e Z d  Z d  Z RS(   N(   t   __name__t
   __module__t   Falset   is_alivet   Nonet   pidt   exitcode(    (    (    s2   lib/python2.7/site-packages/distributed/process.pyR   ,   s   t   AsyncProcessc           B` s%  e  Z d  Z d d d d i  d „ Z d „  Z d „  Z d „  Z d „  Z e	 d „  ƒ Z
 e d „  ƒ Z e	 d „  ƒ Z e	 d	 „  ƒ Z e	 d
 „  ƒ Z d „  Z d „  Z e j d d „ ƒ Z d „  Z d „  Z d „  Z e d „  ƒ Z e d „  ƒ Z e d „  ƒ Z e d „  ƒ Z e j d „  ƒ Z RS(   s„   
    A coroutine-compatible multiprocessing.Process-alike.
    All normally blocking methods are wrapped in Tornado coroutines.
    c         C` s÷   t  | ƒ s( t d t | ƒ f ƒ ‚ n  t ƒ  |  _ | pI t j d t ƒ |  _ t	 j
 d t ƒ \ } |  _ t	 j d |  j d | d | | | | |  j f ƒ |  _ t j |  j ƒ |  j j |  _ t ƒ  |  _ t ƒ  |  _ d  |  _ t |  _ |  j ƒ  d  S(   Ns%   `target` needs to be callable, not %rt   instancet   duplext   targett   nameR   (   t   callablet	   TypeErrort   typeR   t   _stateR	   t   currentR   t   _loopR   t   Pipet   _keep_child_alivet   Processt   _runt   _processt	   _danglingt   addR'   t   _namet   PyQueuet   _watch_qR   t   _exit_futureR    t   _exit_callbackt   _closedt   _start_threads(   t   selfR   R&   R'   R   R   t   parent_alive_pipe(    (    s2   lib/python2.7/site-packages/distributed/process.pyt   __init__8   s     				c         C` s   d |  j  j |  j f S(   Ns   <%s %s>(   t	   __class__R   R5   (   R<   (    (    s2   lib/python2.7/site-packages/distributed/process.pyt   __repr__W   s    c         C` s   |  j  r t d ƒ ‚ n  d  S(   Ns(   invalid operation on closed AsyncProcess(   R:   t
   ValueError(   R<   (    (    s2   lib/python2.7/site-packages/distributed/process.pyt   _check_closedZ   s    	c         C` s¢   t  j d |  j d d |  j d t j |  ƒ |  j |  j |  j |  j	 |  j
 f ƒ |  _ t |  j _ |  j j ƒ  d „  } t |  | d |  j	 ƒ|  _ t |  j _ d  S(   NR&   R'   s#   AsyncProcess %s watch message queueR   c         S` s   |  j  i d d 6ƒ d  S(   Nt   stopt   op(   t
   put_nowait(   t   q(    (    s2   lib/python2.7/site-packages/distributed/process.pyt   stop_threadn   s    RF   (   t	   threadingt   Threadt   _watch_message_queueR'   t   weakreft   refR2   R-   R+   R7   R8   t   _watch_message_threadt   Truet   daemont   startR   t
   _finalizerR   t   atexit(   R<   RG   (    (    s2   lib/python2.7/site-packages/distributed/process.pyR;   ^   s    			c         C` s<   d  |  _ |  j d  k	 r( |  j |  ƒ n  |  j j | ƒ d  S(   N(   R    R2   R9   R8   R   (   R<   R"   (    (    s2   lib/python2.7/site-packages/distributed/process.pyt   _on_exitv   s    	c         ` s8   ‡  f d †  } t  j d | ƒ } t | _ | j ƒ  d S(   sP   
        Immediately exit the process when parent_alive_pipe is closed.
        c           ` s?   y ˆ  j  ƒ  Wn t k
 r. t j d ƒ n Xt d ƒ ‚ d  S(   Niÿÿÿÿs'   unexpected state: should be unreachable(   t   recvt   EOFErrort   ost   _exitR   (    (   R=   (    s2   lib/python2.7/site-packages/distributed/process.pyt   monitor_parentƒ   s
    
R&   N(   RH   RI   RN   RO   RP   (   t   clsR=   RX   t   t(    (   R=   s2   lib/python2.7/site-packages/distributed/process.pyt   _immediate_exit_when_closed}   s    	c          C` sK   xD t  j j j j ƒ  D]- }  x$ t  j |  ƒ j D] } | j ƒ  q/ Wq Wd S(   sw    Python 2's logger's locks don't survive a fork event

        https://github.com/dask/distributed/issues/1491
        N(   t   loggingt   Loggert   managert
   loggerDictt   keyst	   getLoggert   handlerst
   createLock(   R'   t   handler(    (    s2   lib/python2.7/site-packages/distributed/process.pyt   reset_logger_locks   s    c         C` sA   |  j  ƒ  | j ƒ  |  j | ƒ d t j ƒ  _ | | | Ž  d  S(   Nt
   MainThread(   Re   t   closeR[   RH   t   current_threadR'   (   RY   R&   R   R   R=   R/   (    (    s2   lib/python2.7/site-packages/distributed/process.pyR1   §   s
    

c   
      ` sâ   t  ˆ ƒ  ƒ ‰ ˆ ƒ  j ‰  ‡  ‡ ‡ ‡ ‡ ‡ f d †  } x¢ t rÝ ˆ j ƒ  } t j d ˆ | f ƒ | d }	 |	 d k r’ t | | d | ƒ q< |	 d k r¸ t | | d ˆ j ƒ q< |	 d k rÈ Pq< d s< t | ƒ ‚ q< Wd  S(	   Nc       
   ` s   ˆ j  ƒ  t j d t j d d ˆ  d ˆ ˆ ˆ ˆ f ƒ }  t |  _ |  j  ƒ  t ˆ _ ˆ j ˆ _ t	 j
 d ˆ ˆ j f ƒ d  S(   NR&   R'   s"   AsyncProcess %s watch process joinR   s    [%s] created process with pid %r(   RP   RH   RI   R#   t   _watch_processRN   RO   R   R!   t   loggert   debug(   t   thread(   R'   t   processRF   t   rt   selfreft   state(    s2   lib/python2.7/site-packages/distributed/process.pyt   _start¿   s    
		
	
	s   [%s] got message %rRD   RP   R   t	   terminateRC   i    (	   t   reprR'   RN   t   getRj   Rk   R   Rr   t   AssertionError(
   RY   Ro   Rm   R   Rp   RF   t   exit_futureRq   t   msgRD   (    (   R'   Rm   RF   Rn   Ro   Rp   s2   lib/python2.7/site-packages/distributed/process.pyRJ   ·   s    	
c         C` sŸ   t  | ƒ  ƒ } | j ƒ  | j } | d  k	 s4 t ‚ t j d | | j | ƒ t | _	 | | _ | ƒ  } z) | d  k	 r t
 | j | j | ƒ n  Wd  d  } Xd  S(   Ns#   [%s] process %r exited with code %r(   Rs   t   joinR"   R    Ru   Rj   Rk   R!   R   R   R   R-   RS   (   RY   Ro   Rm   Rp   RF   Rn   R"   R<   (    (    s2   lib/python2.7/site-packages/distributed/process.pyRi   Û   s    
				c         C` s5   |  j  ƒ  t ƒ  } |  j j i d d 6| d 6ƒ | S(   sO   
        Start the child process.

        This method is a coroutine.
        RP   RD   R   (   RB   R   R7   RE   (   R<   t   fut(    (    s2   lib/python2.7/site-packages/distributed/process.pyRP   î   s    
	c         C` s5   |  j  ƒ  t ƒ  } |  j j i d d 6| d 6ƒ | S(   sS   
        Terminate the child process.

        This method is a coroutine.
        Rr   RD   R   (   RB   R   R7   RE   (   R<   Ry   (    (    s2   lib/python2.7/site-packages/distributed/process.pyRr   ù   s    
	c         c` s‘   |  j  ƒ  |  j j d k	 s( t d ƒ ‚ |  j j d k	 r> d S| d k rU |  j Vn8 y! t j t	 d | ƒ |  j ƒ VWn t j
 k
 rŒ n Xd S(   sZ   
        Wait for the child process to exit.

        This method is a coroutine.
        s   can only join a started processNt   seconds(   RB   R+   R!   R    Ru   R"   R8   R   t   with_timeoutR   t   TimeoutError(   R<   t   timeout(    (    s2   lib/python2.7/site-packages/distributed/process.pyRx     s    
!c         C` s,   |  j  s( |  j ƒ  d |  _ t |  _  n  d S(   s–   
        Stop helper thread and release resources.  This method returns
        immediately and does not ensure the child process has exited.
        N(   R:   RQ   R    R2   RN   (   R<   (    (    s2   lib/python2.7/site-packages/distributed/process.pyRg     s    	
	c         C` sC   t  | ƒ s t d ƒ ‚ |  j j d k s6 t d ƒ ‚ | |  _ d S(   sÑ   
        Set a function to be called by the event loop when the process exits.
        The function is called with the AsyncProcess as sole argument.

        The function may be a coroutine function.
        s    exit callback should be callables5   cannot set exit callback when process already startedN(   R(   Ru   R+   R!   R    R9   (   R<   R   (    (    s2   lib/python2.7/site-packages/distributed/process.pyt   set_exit_callback!  s    	c         C` s
   |  j  j S(   N(   R+   R   (   R<   (    (    s2   lib/python2.7/site-packages/distributed/process.pyR   /  s    c         C` s
   |  j  j S(   N(   R+   R!   (   R<   (    (    s2   lib/python2.7/site-packages/distributed/process.pyR!   2  s    c         C` s
   |  j  j S(   N(   R+   R"   (   R<   (    (    s2   lib/python2.7/site-packages/distributed/process.pyR"   6  s    c         C` s   |  j  S(   N(   R5   (   R<   (    (    s2   lib/python2.7/site-packages/distributed/process.pyR'   :  s    c         C` s
   |  j  j S(   N(   R2   RO   (   R<   (    (    s2   lib/python2.7/site-packages/distributed/process.pyRO   >  s    c         C` s   | |  j  _ d  S(   N(   R2   RO   (   R<   t   value(    (    s2   lib/python2.7/site-packages/distributed/process.pyRO   B  s    N(    (   R   R   t   __doc__R    R>   R@   RB   R;   RS   t   classmethodR[   t   staticmethodRe   R1   RJ   Ri   RP   Rr   R   t	   coroutineRx   Rg   R~   R   t   propertyR!   R"   R'   RO   t   setter(    (    (    s2   lib/python2.7/site-packages/distributed/process.pyR#   2   s.   				 
$			
		c          C` si   xb t  t ƒ D]T }  |  j r |  j ƒ  r y" t j d |  f ƒ |  j ƒ  Wqa t k
 r] qa Xq q Wd  S(   Ns   reaping stray process %s(   t   listR3   RO   R   Rj   t   warningRr   t   OSError(   t   proc(    (    s2   lib/python2.7/site-packages/distributed/process.pyt   _cleanup_danglingJ  s    ($   t
   __future__R    R   R   RR   t   datetimeR   R\   RV   R   RH   RK   t   compatibilityR   R   R6   t   utilsR   t   tornadoR   t   tornado.concurrentR   t   tornado.ioloopR	   Ra   R   Rj   R   R   t   objectR   R#   t   WeakSetR3   t   registerRŠ   (    (    (    s2   lib/python2.7/site-packages/distributed/process.pyt   <module>   s(   		ÿ 