B
    Ô4\ÂN  ã               @   sV  d dl mZmZmZ d dlmZ d dlZd dlmZ d dl	Z	d dl
Z
d dlZd dlZd dlZd dlZd dlmZ d dlmZmZ d dlmZ dd	lmZmZmZ dd
lmZmZmZmZ ddlm Z  ddl!m"Z" ddl#m$Z$ ddl%m&Z& ddl'm(Z( ddl)m*Z*m+Z+m,Z,m-Z-m.Z. ddl/m0Z0m1Z1m2Z2m3Z3 e 4e5¡Z6G dd„ de"ƒZ7G dd„ de8ƒZ9dS )é    )Úprint_functionÚdivisionÚabsolute_import)Ú	timedeltaN)ÚEmpty)Úgen)ÚIOLoopÚTimeoutError)ÚEventé   )Úget_address_hostÚget_local_address_forÚunparse_host_port)ÚrpcÚ	RPCClosedÚCommClosedErrorÚcoerce_to_address)Útime)Ú
ServerNode)ÚAsyncProcess)Úenable_proctitle_on_children)ÚSecurity)Úget_ipÚ
mp_contextÚsilence_loggingÚjson_load_robustÚPeriodicCallback)Ú_ncoresÚrunÚparse_memory_limitÚWorkerc                   s  e Zd ZdZdZdZdddddddddddddddddg dddddf‡ fd	d
„	Zdd„ Zej	d,dd„ƒZ
edd„ ƒZedd„ ƒZej	d-dd„ƒZd.dd„Zej	d/dd„ƒZej	d0dd„ƒZej	d1dd„ƒZdd „ Zd!d"„ Zd#d$„ Zej	d%d&„ ƒZed'd(„ ƒZej	d2d*d+„ƒZ‡  ZS )3ÚNannyz A process to manage worker processes

    The nanny spins up Worker processes, watches then, and kills or restarts
    them as necessary.
    Nr   zdask-worker-spaceÚautoTF© c                sÎ  |rt |ƒ}|d | _nF|d kr<tj d¡r<tj d¡| _n"|d krPt|ƒ| _nt||fƒ| _|| _|pjt| _|| _	|| _
|| _|| _|| _|| _|d kržtn|| _|pªi | _|| _tj d¡| _|pÊtƒ | _t| jtƒsÞt‚| j d¡| _| j d¡| _|| _|pt ¡ | _t| j| jd| _ || _!|	| _"|| _#d| _$t%|
| jƒ| _&|rXt'|d || _(| j)| j*| j+| j,| j-dœ}t.t/| ƒj0|f| j| jd	œ|—Ž | j&r¾t1| j2d
| jd}|| j3d< || _4d| _5d S )NÚaddresszscheduler-addressz#distributed.worker.memory.terminateÚworker)Úconnection_argsT)Úlevel)ÚinstantiateÚkillÚrestartÚ	terminater   )Úio_loopr&   éd   )r,   ÚmemoryÚinit)6r   Úscheduler_addrÚdaskZconfigÚgetr   Ú_given_worker_portr   ÚncoresÚ	reconnectÚvalidateÚ	resourcesÚdeath_timeoutÚpreloadÚpreload_argvr    ÚenvÚcontact_addressÚmemory_terminate_fractionr   ÚsecurityÚ
isinstanceÚAssertionErrorZget_connection_argsr&   Zget_listen_argsÚlisten_argsÚ	local_dirr   ÚcurrentÚloopr   Ú	schedulerÚservicesÚnameÚquietÚauto_restartr   Úmemory_limitr   Úsilence_logsr(   r)   r*   Ú_closer   Úsuperr!   Ú__init__r   Úmemory_monitorZperiodic_callbacksÚ_listen_addressÚstatus)ÚselfZscheduler_ipZscheduler_portZscheduler_fileZworker_portr4   rD   rB   rF   rG   rJ   r5   r6   rH   r7   rK   r8   r9   r:   r>   r<   Zlisten_addressZworker_classr;   ÚkwargsZcfgZhandlersZpc)Ú	__class__r#   ú0lib/python3.7/site-packages/distributed/nanny.pyrN   (   s`    




zNanny.__init__c             C   s   d| j | jf S )Nz<Nanny: %s, threads: %d>)Úworker_addressr4   )rR   r#   r#   rU   Ú__repr__m   s    zNanny.__repr__é
   c             c   sr   | j d krd S | j j}|d kr"d S tjtttf}y(tjt|d| j	j
| jd|dV  W n |k
rl   Y nX d S )N)Úseconds)r$   )Zquiet_exceptions)ÚprocessrV   r   r	   r   ÚEnvironmentErrorr   Úwith_timeoutr   rE   Ú
unregister)rR   ÚtimeoutrV   Zallowed_errorsr#   r#   rU   Ú_unregisterp   s    
zNanny._unregisterc             C   s   | j d krd S | j jS )N)rZ   rV   )rR   r#   r#   rU   rV   €   s    zNanny.worker_addressc             C   s   | j d krd S | j jS )N)rZ   Ú
worker_dir)rR   r#   r#   rU   r`   „   s    zNanny.worker_dirc             c   sÂ   |s*| j t| jjƒ| jd t| jƒ| _nPt|tƒr^t	t| jjƒƒ| _| j | j|f| jd n| j || jd t| jƒ| _t
 d| j¡ |  ¡ V }|dkr¬| js¤t‚d| _n
|  ¡ V  |  ¡  dS )z2 Start nanny, start local process, start watching )rA   z        Start Nanny at: %rÚrunningN)Zlistenr   rE   r$   rA   r   Zipr?   Úintr   ÚloggerÚinfor(   rV   r@   rQ   rL   Zstart_periodic_callbacks)rR   Úaddr_or_portZresponser#   r#   rU   Ú_startˆ   s$    




zNanny._startc             C   s   | j  | j|¡ d S )N)rD   Úadd_callbackrf   )rR   re   r#   r#   rU   Ústart§   s    zNanny.starté   c             c   s`   d| _ | jdkrt d¡‚| j ¡ | }| jjd|| j ¡   dV  |  || j ¡  ¡V  dS )z… Kill the local worker process

        Blocks until both the process is down and the scheduler is properly
        informed
        FNÚOKgš™™™™™é?)r^   )rI   rZ   r   ÚReturnrD   r   r)   r_   )rR   Úcommr^   Údeadliner#   r#   rU   r)   ª   s    

z
Nanny.killc             c   s   | j r| j }n | jjd }| jjt|| jƒ }| jdkr¢t| jft	| j
| j| jd| ji| j| j| j| j| j| j| j| j| j| j| jd|f| j| j| j| jd| _d| _| jry t t| jd| j  ¡ ¡V }W n2 tj!k
r   | j"| jdV  t #d	¡‚Y nX n| j  ¡ V }t #|¡‚dS )
zu Start a local worker process

        Blocks until the process is up and the scheduler is properly informed
        r   NÚnanny)r4   rB   rF   Zservice_portsrG   rJ   r5   r7   r6   rK   r8   r9   r:   r>   r<   )Úworker_argsÚworker_kwargsÚworker_start_argsrK   Úon_exitr%   r;   T)rY   )r^   z	timed out)$rP   ZlistenerZbound_addressÚprefixr   r3   rZ   ÚWorkerProcessr0   Údictr4   rB   rF   ZportrG   rJ   r5   r7   r6   rK   r8   r9   r:   r>   r<   Ú_on_exitr    r;   rI   r   r\   r   rh   r	   rL   rk   )rR   rl   Z	start_argZhostÚresultr#   r#   rU   r(   ¹   sN    



zNanny.instantiatec             #   sl   t ƒ }tj‡ fdd„ƒ}yt t|d|ƒ ¡V  W n* tjk
r\   t d¡ t d¡‚Y nX t d¡‚d S )Nc               3   s"   ˆ j d k	rˆ  ¡ V  ˆ  ¡ V  d S )N)rZ   r)   r(   r#   )rR   r#   rU   Ú_ñ   s    

zNanny.restart.<locals>._)rY   z,Restart timed out, returning before finishedz	timed outrj   )	r   r   Ú	coroutiner\   r   r	   rc   Úerrorrk   )rR   rl   r^   Úexecutor_waitrh   rx   r#   )rR   rU   r*   í   s    
zNanny.restartc             C   sŒ   | j dkrdS | jj}|dkr"dS yt |j¡}W n tjk
rH   dS X | ¡ j}|| j }| j	rˆ|| j	krˆt
 dd| j	 ¡ | ¡  dS )zE Track worker's memory.  Restart if it goes above terminate fraction ra   Nz.Worker exceeded %d%% memory budget. Restartingr-   )rQ   rZ   ÚpsutilZProcessÚpidZNoSuchProcessZmemory_infoZrssrJ   r=   rc   Úwarningr+   )rR   rZ   Úprocr.   Zfracr#   r#   rU   rO   ÿ   s    


zNanny.memory_monitorc             C   s   | j d k	o| j jdkS )Nra   )rZ   rQ   )rR   r#   r#   rU   Úis_alive  s    zNanny.is_alivec             O   s   t | f|ž|ŽS )N)r   )rR   ÚargsrS   r#   r#   rU   r     s    z	Nanny.runc          	   c   sž   | j dkršy| jj| jdV  W n, ttfk
rL   | jsH|  ¡ V  d S Y nX y(| j dkrt| jrtt	 
d¡ |  ¡ V  W n" tk
r˜   t	jddd Y nX d S )N)ÚclosingÚclosed)r$   zRestarting workerz1Failed to restart worker after its process exitedT)Úexc_info)rQ   rE   r]   rV   r[   r   r5   rL   rI   rc   r~   r(   Ú	Exceptionrz   )rR   Úexitcoder#   r#   rU   rv     s    




zNanny._on_exitc             C   s   | j o| j jS )N)rZ   r}   )rR   r#   r#   rU   r}   *  s    z	Nanny.pidé   c             c   s   | j dkrt d¡‚d| _ t d| j¡ |  ¡  y| jdk	rJ| j|dV  W n t	k
r`   Y nX d| _| j
 ¡  | j ¡  d| _ t d¡‚dS )z;
        Close the worker process, stop all comms.
        )r‚   rƒ   rj   r‚   zClosing Nanny at %rN)r^   rƒ   )rQ   r   rk   rc   rd   r$   ÚstoprZ   r)   r…   r   ÚcloserE   Z	close_rpc)rR   rl   r^   Úreportr#   r#   rU   rL   .  s    




zNanny._close)rX   )r   )r   )Nri   )N)Nri   T)Nr‡   N)Ú__name__Ú
__module__Ú__qualname__Ú__doc__rZ   rQ   rN   rW   r   ry   r_   ÚpropertyrV   r`   rf   rh   r)   r(   r*   rO   r€   r   rv   r}   rL   Ú__classcell__r#   r#   )rT   rU   r!      s>   ?
3r!   c               @   sx   e Zd Zdd„ Zejdd„ ƒZdd„ Zdd„ Zd	d
„ Z	e
dd„ ƒZdd„ Zejddd„ƒZejdd„ ƒZedd„ ƒZdS )rt   c             C   sF   d| _ || _|| _|| _|| _|| _d | _|| _|| _d | _	d | _
d S )Nr/   )rQ   rK   ro   rp   rq   rr   rZ   r    r;   r`   rV   )rR   ro   rp   rq   rK   rr   r%   r;   r#   r#   rU   rN   F  s    zWorkerProcess.__init__c             c   s2  t ƒ  | jdkrt | j¡‚| jdkr>| j ¡ V  t | j¡‚t ¡  | _}t ¡ | _	t
 ¡ j}t| jt| j| j| j| j| j| j	|| j| jd	d| _d| j_| j | j¡ tƒ | _tƒ | _d| _| j ¡ V  |  |¡V }|sêt | j¡‚|d | _|d | _| js
t ‚d| _| j !¡  | "¡  t | j¡‚dS )	z7
        Ensure the worker process is started.
        ra   Ústarting)	ro   rp   rq   rK   Úinit_result_qÚchild_stop_qÚuidr    r;   )ÚtargetrS   Tr$   ÚdirN)#r   rQ   r   rk   ra   Úwaitr   ZQueuer’   r“   ÚuuidZuuid4Úhexr   Ú_runru   ro   rp   rq   rK   r    r;   rZ   ÚdaemonZset_exit_callbackrv   r
   Ústoppedrh   Ú_wait_until_connectedrV   r`   r@   Úsetr‰   )rR   Zinit_qr”   Úmsgr#   r#   rU   rh   V  sH    






zWorkerProcess.startc             C   s   || j k	rd S |  ¡  d S )N)rZ   Úmark_stopped)rR   r   r#   r#   rU   rv   …  s    
zWorkerProcess._on_exitc             C   sD   |d k	st ‚|dkrd|f S |dkr2d||f S d|| f S d S )Néÿ   z.Worker process %d was killed by unknown signalr   z'Worker process %d exited with status %dz)Worker process %d was killed by signal %d)r@   )rR   r}   r†   r#   r#   rU   Ú_death_message‹  s    
zWorkerProcess._death_messagec             C   s   | j d k	o| j  ¡ S )N)rZ   r€   )rR   r#   r#   rU   r€   ”  s    zWorkerProcess.is_alivec             C   s   | j r| j  ¡ r| j jS d S )N)rZ   r€   r}   )rR   r#   r#   rU   r}   —  s    zWorkerProcess.pidc             C   s®   | j dkrª| jj}|d k	st‚|dkr@|  | jj|¡}t |¡ d| _ | j 	¡  | j 
¡  d | _d | _d | _| jrtj | j¡rtj| jdd d | _| jd k	rª|  |¡ d S )Nrœ   r   T)Úignore_errors)rQ   rZ   r†   r@   r¢   r}   rc   r~   rœ   rž   r‰   r’   r“   r`   ÚosÚpathÚexistsÚshutilZrmtreerr   )rR   ÚrrŸ   r#   r#   rU   r      s"    




zWorkerProcess.mark_stoppedri   Tc          
   c   s  t  ¡ }| ¡ | }| jdkr"dS | jdkr<| j ¡ V  dS | jdksJt‚d| _| j}| j 	dt
d|| ¡  ƒd |dœ¡ | j ¡  x$| ¡ rª| ¡ |k rªt d	¡V  qˆW | ¡ rþt d
|¡ y| ¡ V  W n. tk
rü } zt d|¡ W dd}~X Y nX dS )z
        Ensure the worker process is stopped, waiting at most
        *timeout* seconds before terminating it abruptly.
        rœ   NZstopping)r‘   ra   rˆ   r   gš™™™™™é?)Úopr^   r{   gš™™™™™©?z4Worker process still alive after %d seconds, killingz!Failed to kill worker process: %s)r   rC   r   rQ   rœ   r—   r@   rZ   r“   ÚputÚmaxr‰   r€   r   Úsleeprc   r~   r+   r…   rz   )rR   r^   r{   rD   rm   rZ   Úer#   r#   rU   r)   ³  s0    



zWorkerProcess.killc             c   s   d}x†| j dkrd S y| j ¡ }W n" tk
rD   t |¡V  wY nX |d |krTqd|kr~t d|d ¡ | j 	¡ V  |‚qt 
|¡‚qW d S )Ngš™™™™™©?r‘   r”   Ú	exceptionz/Failed while trying to start worker process: %s)rQ   r’   Z
get_nowaitr   r   r¬   rc   rz   rZ   Újoinrk   )rR   r”   ZdelayrŸ   r#   r#   rU   r   ×  s"    

z#WorkerProcess._wait_until_connectedc
                sú   t j |¡ yddlm}
 W n tk
r0   Y nX |
ƒ  |rFt |¡ t 	¡  tƒ ‰ˆ 
¡  |	||Ž‰tjd‡‡fdd„	ƒ‰‡ ‡‡fdd„}tj|d	d
}d|_| ¡  tj‡‡‡‡fdd„ƒ}yˆ |¡ W n& tk
râ   Y n tk
rô   Y nX d S )Nr   )Úinitialize_worker_processr‡   Tc             3   s(   zˆj dd|| dV  W d ˆ  ¡  X d S )NF)rŠ   rn   r{   r^   )rL   rˆ   )r^   r{   )rD   r%   r#   rU   Údo_stop  s    z#WorkerProcess._run.<locals>.do_stopc                 sZ   xTyˆ j dd} W n tk
r&   Y qX ˆ  ¡  |  d¡dksBt‚ˆjˆf| Ž P qW dS )zi
            Wait for an incoming stop message and then stop the
            worker cleanly.
            iè  )r^   r©   rˆ   N)r2   r   r‰   Úpopr@   rg   )rŸ   )r“   r±   rD   r#   rU   Úwatch_stop_q  s    z(WorkerProcess._run.<locals>.watch_stop_qzNanny stop queue watch)r•   rG   c           
   3   s–   yˆj ˆŽ V  W nD tk
rT }  z&t d¡ ˆ  ˆ| dœ¡ ˆ  ¡  W dd} ~ X Y n>X ˆjs`t‚ˆ  ˆjˆjˆdœ¡ ˆ  ¡  ˆ 	¡ V  t 
d¡ dS )zK
            Try to start worker and inform parent of outcome.
            zFailed to start worker)r”   r®   N)r$   r–   r”   zWorker closed)rf   r…   rc   r®   rª   r‰   r$   r@   rB   Zwait_until_closedrd   )r­   )r’   r”   r%   rq   r#   rU   r     s    



zWorkerProcess._run.<locals>.run)r‡   T)r¤   ÚenvironÚupdateZdask.multiprocessingr°   ÚImportErrorrc   ZsetLevelr   Zclear_instanceZmake_currentr   ry   Ú	threadingZThreadr›   rh   Zrun_syncr	   ÚKeyboardInterrupt)Úclsro   rp   rq   rK   r’   r“   r”   r;   r    r°   r³   Útr   r#   )r“   r±   r’   rD   r”   r%   rq   rU   rš   î  s2    

	zWorkerProcess._runN)ri   T)r‹   rŒ   r   rN   r   ry   rh   rv   r¢   r€   r   r}   r    r)   r   Úclassmethodrš   r#   r#   r#   rU   rt   D  s   /	#rt   ):Z
__future__r   r   r   Zdatetimer   ZloggingZmultiprocessing.queuesr   r¤   r|   r§   r·   r˜   r1   Ztornador   Ztornado.ioloopr   r	   Ztornado.locksr
   rl   r   r   r   Zcorer   r   r   r   Zmetricsr   Znoder   rZ   r   Z	proctitler   r>   r   Zutilsr   r   r   r   r   r%   r   r   r   r    Z	getLoggerr‹   rc   r!   Úobjectrt   r#   r#   r#   rU   Ú<module>   s4   
  '