B
    t\f                  @   s  d dl mZmZmZ d dlZd dlZd dlZd dlZd dlZd dl	Z	d dl
mZ d dlZd dlZd dlZd dlmZmZ d dlmZ d dlm
Z
 d dlmZ d dlmZmZ d	d
 Zdd Zdd Zdd Zdd Zdd Zdd Z dd Z!dd Z"edd Z#dS )    )print_functiondivisionabsolute_importN)sleep)EmptyWINDOWS)	WorkSpace)time)
mp_context)captured_loggerslowc                sB    fdd|D } fddt  D }t|t|ks>td S )Nc                s   g | ]}t j |qS  )ospathjoin).0p)dir_pathr   ?lib/python3.7/site-packages/distributed/tests/test_diskutils.py
<listcomp>   s    z-assert_directory_contents.<locals>.<listcomp>c                s"   g | ]}|d krt j |qS ))zglobal.lockz
purge.lock)r   r   r   )r   r   )r   r   r   r      s   )r   listdirsortedAssertionError)r   ZexpectedZactualr   )r   r   assert_directory_contents   s    
r   c             C   s<  t | }tt|}t|}|g  |jdd}|ddg |jdd}|ddddg |  |ddddg |  |ddg ~t	  |g  |jdd}|jdd}|jdd}||j
|j|j
|j|j
|jh tj|j
dsttj|j
dsttj|j
ds&t|j
|j
ks8td S )	Naa)namez
aa.dirlockbbz
bb.dirlockzfoo-)prefixzbar-)str	functoolspartialr   r   new_work_dir_purge_leftoversreleasegccollectr   Z
_lock_pathr   r   basename
startswithr   )tmpdirbase_dirassert_contentswsabcr   r   r   test_workdir_simple   s2    
r/   c             C   s   t | }tt|}t|}|g  |jdd}|ddg t|}|  |ddg |jdd}|ddddg ~~t  |ddg ~t  |g  d S )Nr   )r   z
aa.dirlockr   z
bb.dirlock)	r   r   r    r   r   r!   r"   r$   r%   )r(   r)   r*   r+   r,   Zws2r-   r   r   r   %test_two_workspaces_in_same_directory?   s$    r0   c       
   	      sN  t | }tt|}t|}dt|d }tjtj	d|gtj
tj
dd  j }  d ksbtt|\}}|||d ||d g |  |||d ||d g      st|||d ||d g tddd	d
}|  W d Q R X |g  |  }	t|	dkstx.||fD ]" t fdd|	D s$tq$W d S )NaT  if 1:
        import signal
        import sys
        import time

        from distributed.diskutils import WorkSpace

        ws = WorkSpace(%(base_dir)r)
        a = ws.new_work_dir(name='aa')
        b = ws.new_work_dir(prefix='foo-')
        print((a.dir_path, b.dir_path))
        sys.stdout.flush()

        time.sleep(100)
        )r)   z-cT)stdinstdoutZuniversal_newlinesz.dirlockzdistributed.diskutilsINFOF)	propagate   c             3   s   | ]}t  |kV  qd S )N)repr)r   line)r   r   r   	<genexpr>   s    z/test_workspace_process_crash.<locals>.<genexpr>)r   r   r    r   r   dict
subprocessPopensys
executablePIPEr2   readlineZpollr   evalr"   killwaitr   getvalue
splitlineslenany)
r(   r)   r*   r+   coder7   Za_pathZb_pathsiolinesr   )r   r   test_workspace_process_crashY   s.    
rJ   c          	   C   s   t | }t|}|jdd}t|j tdddd}|  W d Q R X | 	 }|s^t
x"|D ]}|d|jf sdt
qdW d S )Nr   )r   zdistributed.diskutilsERRORF)r4   zFailed to remove %r)r   r   r!   shutilZrmtreer   r   r#   rC   rD   r   r'   )r(   r)   r+   r,   rH   rI   r7   r   r   r   test_workspace_rmtree_failure   s    
rM   c          
   C   s   t | }tjddi td}tt|}t	|}|g  |j
dd}|dg |j
dd}|ddg |  |ddg |  |dg ~t  |g  W d Q R X |  W d Q R X d S )Nz#distributed.worker.use-file-lockingFz&distributed.diskutils.locket.lock_filer   )r   r   )r   daskZconfigsetmockZpatchr   r    r   r   r!   r"   r#   r$   r%   Zassert_not_called)r(   r)   Z	lock_filer*   r+   r,   r-   r   r   r   test_locking_disabled   s$    

rQ   c       
      C   s   t | }d}tddZ}xR| sjy| }W n, tk
rZ } z|| W d d }~X Y qX |t|7 }qW W d Q R X |  }	|	ryt	dt|	 W n, tk
r } z|| W d d }~X Y nX || d S )Nr   zdistributed.diskutilsrK   zgot %d logs, see stderr)
r   r   Zis_setr"   	ExceptionZputrE   rC   rD   r   )
r)   purged_qerr_qstop_evtr+   n_purgedrH   ZpurgederI   r   r   r   _workspace_concurrency   s     
rX   c          	      sx  t |  t t t t }dd |_tjdkr@dn|} fddt|D }x|D ]}|	  qfW d}d}zVt
 }	xJt
 |	 |k rx.tdD ]"}
|jd	d
}|j  |d7 }qW td qW W d  x|D ]}|  qW X y }W n tk
r   Y nX |yx| 7 }q"W W n tk
rL   Y nX |d|   krjdkspn t||fS )za
    WorkSpace concurrency test.  We merely check that no exception or
    deadlock happens.
    c               S   s   d S )Nr   r   r   r   r   <lambda>   s    z-_test_workspace_concurrency.<locals>.<lambda>Zwin32r5   c                s"   g | ]}t jt fd qS ))targetargs)r
   ZProcessrX   )r   i)r)   rT   rS   rU   r   r   r      s   z/_test_workspace_concurrency.<locals>.<listcomp>r   2   zworkspace-concurrency-)r      g{Gz?Ng      ?)r   r
   ZQueueZEventr   r"   r<   platformrangestartr	   r!   Z
_finalizerdetachr   rO   r   Z
get_nowaitr   r   )r(   ZtimeoutZ	max_procsr+   ZNPROCSZ	processesr   	n_createdrV   Zt1r\   derrr   )r)   rT   rS   rU   r   _test_workspace_concurrency   sH    



"rf   c             C   s    t rtjdt| dd d S )Nz TODO: unknown failure on windowsg       @   )r   pytestZxfailrR   rf   )r(   r   r   r   test_workspace_concurrency  s    ri   c             C   s    t | dd\}}|dkstd S )Ng       @   d   )rf   r   )r(   rc   rV   r   r   r   "test_workspace_concurrency_intense
  s    rl   )$Z
__future__r   r   r   r   r$   r   rL   r:   r<   r	   r   rP   rh   rN   Zdistributed.compatibilityr   r   Zdistributed.diskutilsr   Zdistributed.metricsZdistributed.utilsr
   Zdistributed.utils_testr   r   r   r/   r0   rJ   rM   rQ   rX   rf   ri   rl   r   r   r   r   <module>   s2   !39