B
    t\5                 @   s   d dl mZmZmZ d dlZd dlZd dlZd dlZd dlZd dl	Z	yd dl
mZ W n  ek
rt   d dlmZ Y nX d dlmZ d dlmZ d dlmZ eeZG dd dZd	d
 ZdddZdddZG dd deZdS )    )print_functiondivisionabsolute_importN)Queue)Thread)merge)genc               @   s,   e Zd ZdZdZdZdZdZdZdZ	dZ
d	S )
bcolorsz[95mz[94mz[92mz[93mz[91mz[0mz[1mz[4mN)__name__
__module____qualname__ZHEADERZOKBLUEZOKGREENWARNINGFAILENDCBOLDZ	UNDERLINE r   r   5lib/python3.7/site-packages/distributed/deploy/ssh.pyr	      s   r	   c       
   
      s^  dd l }ddlm  ddlm}m} | }||  d}x"y@t	
dt	j |jd d d d d	d
d
d P W qB ||fk
r^ } ztdtj djd d d d tj  ttjd t| tj  t  |d7 }|dkr"tdtj d tj  td tdtj dj|dd tj  td W d d }~X Y qBX qBW tdjd d d |jdd  d d	d\}jd  fdd fd d!fd"d#}x(d$  rtd% | rP qW t }	x6t |	d& k rHd' | r:P td% qW   |  d S )(Nr   )PipeTimeout)SSHExceptionPasswordRequiredExceptionparamikoaddressssh_usernamessh_portssh_private_keyT   )ZhostnameZusernameportZkey_filenamecompresstimeoutZbanner_timeoutz[ dask-ssh ] : zCSSH connection error when connecting to {addr}:{port}to run '{cmd}'cmd)addrr   r   z,               SSH reported this exception:       z/SSH connection failed after 3 retries. Exiting.z               z!Retrying... (attempt {n}/{total}))nZtotalz[ {label} ] : {cmd}label)r$   r   z$SHELL -i -c '')Zget_ptyg?c           	      sv   yV  } xHt| dkrR|  } td|  d djd | d   } qW W n  tjfk
rp   Y nX dS )z<
        Read stdout stream, time out if necessary.
        r   zstdout from ssh channel: %soutput_queuez[ {label} ] : {output}r$   )r$   outputN)	readlinelenrstriploggerdebugputformatsocketr   )line)r   cmd_dictstdoutr   r   read_from_stdoutn   s    
z#async_ssh.<locals>.read_from_stdoutc           	      s   yl  } x^t| dkrh|  } td|  d djd dtj dj| d tj	    } qW W n  t
jfk
r   Y nX d	S )
z<
        Read stderr stream, time out if necessary.
        r   zstderr from ssh channel: %sr&   z[ {label} ] : r$   )r$   z{output})r'   N)r(   r)   r*   r+   r,   r-   r.   r	   r   r   r/   r   )r0   )r   r1   stderrr   r   read_from_stderr}   s    *z#async_ssh.<locals>.read_from_stderrc                 sV          rR  } d djd dtj d t|  tj  dS dS )zp
        Communicate a little bit, without blocking too long.
        Return True if the command ended.
        r&   z[ {label} ] : r$   )r$   z'remote process exited with exit status TN)Zexit_status_readyZrecv_exit_statusr-   r.   r	   r   strr   )Zexit_status)channelr1   r5   r3   r   r   communicate   s    *zasync_ssh.<locals>.communicateinput_queueg      ?g      @   ) r   Zparamiko.buffered_piper   Zparamiko.ssh_exceptionr   r   Z	SSHClientZset_missing_host_key_policyZAutoAddPolicylogging	getLoggerZsetLevelZWARNZconnectprintr	   r   r.   r   r6   	traceback	print_excos_exittimesleepZexec_commandr7   Z
settimeoutemptysendclose)
r1   r   r   r   ZsshZretriesestdinr8   startr   )r   r7   r1   r5   r3   r4   r2   r   	async_ssh&   sh    
.


 	 


rJ   c          
   C   s   dj |ptj|| d}| d k	rBdj | d| }|dj ||| d7 }tjdj ||d tj }t }	t }
|||||	|
|||d		}tt|gd
}d|_	|
  t|d|iS )Nz8{python} -m distributed.cli.dask_scheduler --port {port})pythonr   logdirzmkdir -p {logdir} && )rL   z,&> {logdir}/dask_scheduler_{addr}:{port}.log)r    r   rL   zscheduler {addr}:{port})r    r   )	r   r$   r   r   r9   r&   r   r   r   )targetargsTthread)r.   sys
executabler	   r   r   r   r   rJ   daemonrI   r   )rL   r    r   r   r   r   remote_pythonr   r$   r9   r&   r1   rO   r   r   r   start_scheduler   s$    rT   distributed.cli.dask_workerc             C   s   d|dkrdnd }|	s |d7 }|
r,|d7 }|r8|d7 }|rD|d7 }|j |pPtj|||||||
||d	
}| d k	rd
j | d| }|dj || d7 }dj |d}t }t }||||||||d}tt|gd}d|_|  t|d|iS )NzX{python} -m {remote_dask_worker} {scheduler_addr}:{scheduler_port} --nthreads {nthreads}r!   z--nprocs {nprocs} z --host {worker_addr} z--memory-limit {memory_limit} z--worker-port {worker_port} z--nanny-port {nanny_port} )
rK   remote_dask_workerscheduler_addrscheduler_portworker_addrnthreadsnprocsmemory_limitworker_port
nanny_portzmkdir -p {logdir} && )rL   z%&> {logdir}/dask_scheduler_{addr}.log)r    rL   zworker {addr})r    )r   r$   r   r9   r&   r   r   r   )rM   rN   TrO   )	r.   rP   rQ   r   r   rJ   rR   rI   r   )rL   rX   rY   rZ   r[   r\   r   r   r   nohostr]   r^   r_   rS   rW   r   r$   r9   r&   r1   rO   r   r   r   start_worker   sH    ra   c               @   sX   e Zd ZdddZejd	d
 Zedd Zdd Z	dd Z
dd Zdd Zdd ZdS )
SSHClusterr   r!   N   Fdistributed.cli.dask_workerc             C   s   || _ || _|| _|| _|| _|| _|| _|	| _|| _|| _	|| _
|| _|| _dd l}|
d k	rtj|
d|j d }
ttjdj|
d tj  |
| _g | _t|
||||||| _g | _xt|D ]\}}| | qW d S )Nr   z	dask-ssh_z%Y-%m-%d_%H:%M:%SzaOutput will be redirected to logfiles stored locally on individual worker nodes under "{logdir}".)rL   )rX   rY   r[   r\   r   r   r   r`   rS   r]   r^   r_   rW   datetimer@   pathjoinZnowZstrftimer=   r	   r   r.   r   rL   ZthreadsrT   	schedulerworkers	enumerate
add_worker)selfrX   rY   Zworker_addrsr[   r\   r   r   r   r`   rL   rS   r]   r^   r_   rW   re   ir    r   r   r   __init__  s4    
zSSHCluster.__init__c             C   s   d S )Nr   )rl   r   r   r   _startB  s    zSSHCluster._startc             C   s   d| j | jf S )Nz%s:%d)rX   rY   )rl   r   r   r   scheduler_addressF  s    zSSHCluster.scheduler_addressc             C   sl   | j g| j }yDx>x.|D ]&}x |d  s<t|d   qW qW td qW W n tk
rf   Y nX d S )Nr&   g?)rh   ri   rD   r=   getrB   rC   KeyboardInterrupt)rl   all_processesprocessr   r   r   monitor_remote_processesJ  s    
z#SSHCluster.monitor_remote_processesc             C   sL   | j t| j| j| j|| j| j| j| j	| j
| j| j| j| j| j| j d S )N)ri   appendra   rL   rX   rY   r[   r\   r   r   r   r`   r]   r^   r_   rS   rW   )rl   r   r   r   r   rk   ^  s    zSSHCluster.add_workerc             C   s:   | j g| j }x&|D ]}|d d |d   qW d S )Nr9   shutdownrO   )rh   ri   r-   rg   )rl   rs   rt   r   r   r   rw   j  s    
zSSHCluster.shutdownc             C   s   | S )Nr   )rl   r   r   r   	__enter__q  s    zSSHCluster.__enter__c             G   s   |    d S )N)rw   )rl   rN   r   r   r   __exit__t  s    zSSHCluster.__exit__)r   r!   Nrc   NFNNNNNrd   )r
   r   r   rn   r   	coroutinero   propertyrp   ru   rk   rw   rx   ry   r   r   r   r   rb     s       
*rb   )N)NrU   )Z
__future__r   r   r   r;   r/   r@   rP   rB   r>   Zqueuer   ImportErrorZ	threadingr   Ztoolzr   Ztornador   r<   r
   r+   r	   rJ   rT   ra   objectrb   r   r   r   r   <module>   s*   
 
& 
8