B
    F.\+                 @   s   d dl mZmZmZ d dlZd dlmZ d dlZd dlZd dl	Z	d dl
Z
d dlZddlmZmZ ddlmZ d dlmZ d dlmZ d d	lmZ eeZd
d Zdd ZG dd deZG dd deZe  Z!ej"dd Z#dS )    )print_functiondivisionabsolute_importN)	timedelta   )finalizeQueue)
mp_context)gen)Future)IOLoopc          
   G   sN   y| j |f|  W n4 tk
rH } ztdt|s8 W dd}~X Y nX dS )zQ
    Helper to silence "IOLoop is closing" exception on IOLoop.add_callback.
    zIOLoop is clos(ed|ing)N)Zadd_callbackRuntimeErrorresearchstr)loopfuncargsexc r   2lib/python3.7/site-packages/distributed/process.py_loop_add_callback   s
    r   c          
   O   sR   y|||}W n0 t k
r> } zt| |j| W d d }~X Y nX t| |j| d S )N)	Exceptionr   Zset_exception
set_result)r   futurer   r   kwargsZresr   r   r   r   _call_and_set_future!   s
     r   c               @   s   e Zd ZdZdZdZdS )_ProcessStateFN)__name__
__module____qualname__is_alivepidexitcoder   r   r   r   r   ,   s   r   c               @   s   e Zd ZdZddddi fddZdd Zdd	 Zd
d Zdd Ze	dd Z
edd Ze	dd Ze	dd Ze	dd Zdd Zdd Zejd-ddZdd Zd d! Zd"d# Zed$d% Zed&d' Zed(d) Zed*d+ Zejd,d+ ZdS ).AsyncProcessz
    A coroutine-compatible multiprocessing.Process-alike.
    All normally blocking methods are wrapped in Tornado coroutines.
    Nr   c             C   s   t |stdt|f t | _|p0tjdd| _tj	dd\}| _
tj| j|||||| j
fd| _t| j | jj| _t | _t | _d | _d| _|   d S )Nz%`target` needs to be callable, not %rF)instance)Zduplex)targetnamer   )callable	TypeErrortyper   _stater   Zcurrent_loopr	   ZPipe_keep_child_aliveZProcess_run_process	_danglingaddr'   _namePyQueue_watch_qr   _exit_future_exit_callback_closed_start_threads)selfr   r&   r'   r   r   parent_alive_piper   r   r   __init__8   s"    

zAsyncProcess.__init__c             C   s   d| j j| jf S )Nz<%s %s>)	__class__r   r2   )r9   r   r   r   __repr__W   s    zAsyncProcess.__repr__c             C   s   | j rtdd S )Nz(invalid operation on closed AsyncProcess)r7   
ValueError)r9   r   r   r   _check_closedZ   s    zAsyncProcess._check_closedc          	   C   sn   t j| jd| j t| | j| j| j| j	| j
fd| _d| j_| j  dd }t| || j	d| _d| j_d S )Nz#AsyncProcess %s watch message queue)r&   r'   r   Tc             S   s   |  ddi d S )Nopstop)
put_nowait)qr   r   r   stop_threadg   s    z0AsyncProcess._start_threads.<locals>.stop_thread)rC   F)	threadingThread_watch_message_queuer'   weakrefrefr/   r,   r+   r4   r5   Z_watch_message_threaddaemonstartr   
_finalizeratexit)r9   rD   r   r   r   r8   ^   s    
zAsyncProcess._start_threadsc             C   s*   d | _ | jd k	r| |  | j| d S )N)r/   r6   r5   r   )r9   r#   r   r   r   _on_exito   s    

zAsyncProcess._on_exitc                s*    fdd}t j|d}d|_|  dS )zP
        Immediately exit the process when parent_alive_pipe is closed.
        c                  s8   y    W n tk
r*   td Y n
X tdd S )Nz'unexpected state: should be unreachable)ZrecvEOFErroros_exitr   r   )r:   r   r   monitor_parent{   s
    
z@AsyncProcess._immediate_exit_when_closed.<locals>.monitor_parent)r&   TN)rE   rF   rJ   rK   )clsr:   rS   tr   )r:   r   _immediate_exit_when_closedv   s    z(AsyncProcess._immediate_exit_when_closedc              C   s:   x4t jjj D ]"} xt | jD ]}|  q"W qW dS )zw Python 2's logger's locks don't survive a fork event

        https://github.com/dask/distributed/issues/1491
        N)loggingZLoggerZmanagerZ
loggerDictkeys	getLoggerZhandlersZ
createLock)r'   Zhandlerr   r   r   reset_logger_locks   s    zAsyncProcess.reset_logger_locksc             C   s2   |    |  | | dt _||| d S )NZ
MainThread)rZ   closerV   rE   Zcurrent_threadr'   )rT   r&   r   r   r:   r-   r   r   r   r.      s
    

zAsyncProcess._runc       
         s   t   j  fdd}xt }td|f  |d }	|	dkrft||d | q*|	dkrt||d j q*|	dkrP q*d	s*t|q*W d S )
Nc                 sZ      tjtjd  fd} d| _|    d_j_t	djf  d S )Nz"AsyncProcess %s watch process join)r&   r'   r   Tz [%s] created process with pid %r)
rK   rE   rF   r$   _watch_processrJ   r!   r"   loggerdebug)Zthread)r'   processrC   rselfrefstater   r   _start   s    z1AsyncProcess._watch_message_queue.<locals>._startz[%s] got message %rr@   rK   r   	terminaterA   r   )reprr'   getr]   r^   r   rd   AssertionError)
rT   ra   r_   r   rb   rC   Zexit_futurerc   msgr@   r   )r'   r_   rC   r`   ra   rb   r   rG      s    
z!AsyncProcess._watch_message_queuec             C   sp   t | }|  |j}|d k	s$ttd||j| d|_||_| }z|d k	rbt|j	|j
| W d d }X d S )Nz#[%s] process %r exited with code %rF)re   joinr#   rg   r]   r^   r"   r!   r   r,   rN   )rT   ra   r_   rb   rC   r`   r#   r9   r   r   r   r\      s    
zAsyncProcess._watch_processc             C   s$   |    t }| jd|d |S )zO
        Start the child process.

        This method is a coroutine.
        rK   )r@   r   )r?   r   r4   rB   )r9   futr   r   r   rK      s    zAsyncProcess.startc             C   s$   |    t }| jd|d |S )zS
        Terminate the child process.

        This method is a coroutine.
        rd   )r@   r   )r?   r   r4   rB   )r9   rj   r   r   r   rd      s    zAsyncProcess.terminatec             c   st   |    | jjdk	std| jjdk	r,dS |dkr>| jV  n2ytt|d| jV  W n tj	k
rn   Y nX dS )zZ
        Wait for the child process to exit.

        This method is a coroutine.
        Nzcan only join a started process)Zseconds)
r?   r+   r"   rg   r#   r5   r
   Zwith_timeoutr   TimeoutError)r9   Ztimeoutr   r   r   ri      s    
zAsyncProcess.joinc             C   s   | j s|   d| _d| _ dS )z
        Stop helper thread and release resources.  This method returns
        immediately and does not ensure the child process has exited.
        NT)r7   rL   r/   )r9   r   r   r   r[     s    zAsyncProcess.closec             C   s.   t |std| jjdks$td|| _dS )z
        Set a function to be called by the event loop when the process exits.
        The function is called with the AsyncProcess as sole argument.

        The function may be a coroutine function.
        z exit callback should be callableNz5cannot set exit callback when process already started)r(   rg   r+   r"   r6   )r9   r   r   r   r   set_exit_callback  s    zAsyncProcess.set_exit_callbackc             C   s   | j jS )N)r+   r!   )r9   r   r   r   r!   %  s    zAsyncProcess.is_alivec             C   s   | j jS )N)r+   r"   )r9   r   r   r   r"   (  s    zAsyncProcess.pidc             C   s   | j jS )N)r+   r#   )r9   r   r   r   r#   ,  s    zAsyncProcess.exitcodec             C   s   | j S )N)r2   )r9   r   r   r   r'   0  s    zAsyncProcess.namec             C   s   | j jS )N)r/   rJ   )r9   r   r   r   rJ   4  s    zAsyncProcess.daemonc             C   s   || j _d S )N)r/   rJ   )r9   valuer   r   r   rJ   8  s    )N)r   r   r    __doc__r;   r=   r?   r8   rN   classmethodrV   staticmethodrZ   r.   rG   r\   rK   rd   r
   	coroutineri   r[   rl   r!   propertyr"   r#   r'   rJ   setterr   r   r   r   r$   2   s.   
#
r$   c           	   C   sV   xPt tD ]D} | jr
|  r
ytd| f  |   W q
 tk
rL   Y q
X q
W d S )Nzreaping stray process %s)listr0   rJ   r!   r]   Zwarningrd   OSError)procr   r   r   _cleanup_dangling@  s    rw   )$Z
__future__r   r   r   rM   Zdatetimer   rW   rQ   r   rE   rH   Zcompatibilityr   r   r3   Zutilsr	   Ztornador
   Ztornado.concurrentr   Ztornado.ioloopr   rY   r   r]   r   r   objectr   r$   WeakSetr0   registerrw   r   r   r   r   <module>   s*   
  