B
    H/\O                 @   s  d dl mZ d dlZd dlZd dlmZ d dlmZ d dl	m
Z
 d dlmZ d dlmZ d dlmZ d d	lmZ d d
lmZ d dlmZ d dlmZ d dlmZ d dlmZ d dlmZ d dlmZ d dlmZ ddgZG dd deZG dd deZ G dd de!Z"e" Z"G dd de!Z#dd Z$yd dl%Z&W n e'k
rH   Y ndX e(d d dl)m*Z+ d dl,m-Z- d dl%m.Z/ d d! Z0d"d# Z1G d$d% d%e!Z2G d&d de&j3j4Z4dS )'    )absolute_importN)ref)greenlet)integer_types)_get_hub_noargs)
getcurrent)sleep)_get_hub)AsyncResult)Greenlet)GroupMappingMixin)	Semaphore)Lock)Queue)start_new_thread)get_thread_ident
ThreadPoolThreadResultc               @   s   e Zd Zdd Zdd ZdS )_WorkerGreenletc             C   s2   t | |j t | _t|| _d| _d| j_	d S )NT)
RawGreenlet__init___workerr   thread_identwref_threadpool_wrefZgreenlet_tree_is_rootparentZgreenlet_tree_is_ignored)selfZ
threadpool r   0lib/python3.7/site-packages/gevent/threadpool.pyr   $   s
    
z_WorkerGreenlet.__init__c             C   s   dt | | j|  f S )Nz/<ThreadPoolWorker at 0x%x thread_ident=0x%x %s>)idr   r   )r   r   r   r   __repr__/   s    z_WorkerGreenlet.__repr__N)__name__
__module____qualname__r   r    r   r   r   r   r       s   r   c               @   s   e Zd ZdZd4ddZdd Zdd ZeeeZd	d
 Z	dd Z
dd Zdd ZeeeZdd Zdd Zdd Zdd Zdd Zdd Zdd Zdd  Zd!d" Zd#d$ Zd%Zd&d' Zd(d) Zd*d+ Zd5d,d-Zd.d/ Zd0d1 Zd2d3 ZdS )6r   z
    .. note:: The method :meth:`apply_async` will always return a new
       greenlet, bypassing the threadpool entirely.
    .. caution:: Instances of this class are only true if they have
       unfinished tasks.
    Nc             C   sf   |d krt  }|| _d| _d | _t | _|jjdd| _	y| 
| W n   | j	   Y nX d S )Nr   F)r   )get_hubhub_maxsizemanagerosgetpidpidloopforkfork_watcher_initclose)r   maxsizer%   r   r   r   r   =   s    

zThreadPool.__init__c             C   sd   t |tstd|f |dk r.td|f || j }| j j|7  _|| _|   | j  d S )Nzmaxsize must be integer: %rr   z maxsize must not be negative: %r)	
isinstancer   	TypeError
ValueErrorr&   
_semaphoreZcounteradjustZ_start_notify)r   r0   
differencer   r   r   _set_maxsizeK   s    

zThreadPool._set_maxsizec             C   s   | j S )N)r&   )r   r   r   r   _get_maxsizeW   s    zThreadPool._get_maxsizec          	   C   s8   d| j jt| t| | j| j| jj jt| j| jjf S )Nz8<%s at 0x%x %s/%s/%s hub=<%s at 0x%x thread_ident=0x%s>>)	__class__r!   r   lensizer0   r%   r   )r   r   r   r   r    \   s
    zThreadPool.__repr__c             C   s   | j jS )N)
task_queueunfinished_tasks)r   r   r   r   __len__c   s    zThreadPool.__len__c             C   s   | j S )N)_size)r   r   r   r   	_get_sizei   s    zThreadPool._get_sizec             C   s   |dk rt d|f || jkr2t d|| jf | jrB| j  x| j|k rX|   qDW | jjj}xV| j|krx"| j| | j	j
kr| j	d  qrW t | jkrP t| t|d d}qfW | jr| j| j n
| j  d S )Nr   z'Size of the pool cannot be negative: %rz7Size of the pool cannot be bigger than maxsize: %r > %r   g?)r3   r&   r'   killr?   _add_threadr%   r+   approx_timer_resolutionr<   r=   putr   r   minr-   start_on_forkstop)r   r;   delayr   r   r   	_set_sizel   s&    


zThreadPool._set_sizec             C   s.   d| _ td| _t | _t | _| | d S )Nr      )r?   r   r4   r   _lockr   r<   r7   )r   r0   r   r   r   r.      s
    
zThreadPool._initc             C   s(   t  }|| jkr$|| _| | j d S )N)r(   r)   r*   r.   r&   )r   r*   r   r   r   rH      s    
zThreadPool._on_forkc             C   s<   t d| jjj}x&| jjdkr6t| t|d d}qW dS )z6Waits until all outstanding tasks have been completed.gMb@?r   rA   g?N)maxr%   r+   rD   r<   r=   r   rF   )r   rJ   r   r   r   join   s    zThreadPool.joinc             C   s   d| _ | j  d S )Nr   )r;   r-   r/   )r   r   r   r   rB      s    zThreadPool.killc             C   sr   x&| j | jk r&| jj| j kr&|   qW x$| j | j | jjkrL| jd  q*W | j rd| j| j n
| j	  d S )N)
r?   r&   r<   r=   rC   rE   r-   rG   rH   rI   )r   r   r   r   _adjust_step   s    zThreadPool._adjust_stepc             C   s<   d}x2|    | j| jkrd S t| t|d d}qW d S )Ng-C6?rA   g?)rP   r?   r&   r   rF   )r   rJ   r   r   r   _adjust_wait   s    zThreadPool._adjust_waitc             C   s,   |    | js(| j| jkr(t| j| _d S )N)rP   r'   r?   r&   r   spawnrQ   )r   r   r   r   r5      s    zThreadPool.adjustc             C   sd   | j  |  jd7  _W d Q R X yt| jd W n.   | j  |  jd8  _W d Q R X  Y nX d S )NrL   r   )rM   r?   r   _ThreadPool__trampoline)r   r   r   r   rC      s    zThreadPool._add_threadc             O   s   x| j }|  || j krP qW d}y:| j}t }t|| j|j}|||||f |   W n&   |dk	rv|	  |   Y nX |S )z
        Add a new task to the threadpool that will run ``func(*args, **kwargs)``.

        Waits until a slot is available. Creates a new thread if necessary.

        :return: A :class:`gevent.event.AsyncResult`.
        N)
r4   acquirer<   r
   r   r%   releaserE   r5   destroy)r   funcargskwargsZ	semaphorethread_resultr<   resultr   r   r   rR      s$    
zThreadPool.spawnc          	   C   sB   t d krd S t| dd }|d k	r>| |  jd8  _W d Q R X d S )NrM   rL   )sysgetattrr?   )r   rM   r   r   r   _decrease_size   s    zThreadPool._decrease_sizeTc             C   s    |d k	r|j d k	r|j   d S )N)Zperiodic_monitoring_threadZ ignore_current_greenlet_blocking)r   r%   r   r   r   Z"__ignore_current_greenlet_blocking   s    z-ThreadPool.__ignore_current_greenlet_blockingc             C   s   t | }|  d S )N)r   Zswitch)r   gr   r   r   Z__trampoline   s    zThreadPool.__trampolinec          	   C   s,  d}zxt  }|d k	rd|_| j}| | | }z|d krNd}|   d S |\}}}}zby|||}	W n6   ttdd }
|
d krd S || |f|
  Y nX td krd S |	|	 ~	W d ~~~~~X W d td krd S |
  X qW W d |r|   td k	r&| jr&t  }|d k	r$|d ~X d S )NTzThreadPool Worker HubFexc_info)r	   namer<   -_ThreadPool__ignore_current_greenlet_blockinggetr^   r]   r\   handle_errorsetZ	task_done_destroy_worker_hubrV   )r   Zneed_decreasehr<   ZtaskrW   rX   rY   rZ   valuer`   r%   r   r   r   r      sJ    



zThreadPool._workerc             C   s   |  |||S )z{
        .. deprecated:: 1.1a2
           Identical to :meth:`apply`; the ``expected_errors`` argument is ignored.
        )Zapply)r   Zexpected_errorsfunctionrX   rY   r   r   r   apply_e3  s    zThreadPool.apply_ec             C   s   t  | jk	S )N)r$   r%   )r   r   r   r   _apply_immediatelyA  s    zThreadPool._apply_immediatelyc             C   s   || d S )Nr   )r   callbackr[   r   r   r   _apply_async_cb_spawnI  s    z ThreadPool._apply_async_cb_spawnc             C   s   dS )NTr   )r   r   r   r   _apply_async_use_greenletL  s    z$ThreadPool._apply_async_use_greenlet)N)NN) r!   r"   r#   __doc__r   r7   r8   propertyr0   r    r>   r@   rK   r;   r.   rH   rO   rB   rP   rQ   r5   rC   rR   r^   rf   rb   rS   r   rj   rk   rm   rn   r   r   r   r   r   5   s8   


	
 
3
c               @   s0   e Zd Zdd Ze ZZdd Zdd ZeZdS )
_FakeAsyncc             C   s   d S )Nr   )r   r   r   r   sendR  s    z_FakeAsync.sendc             C   s   dS )zfake out for 'receiver'Nr   )r   r[   r   r   r   Z__call_V  s    z_FakeAsync.__call_c             C   s   dS )NFr   )r   r   r   r   __bool__Y  s    z_FakeAsync.__bool__N)	r!   r"   r#   rr   r/   rI   Z_FakeAsync__call_rs   Z__nonzero__r   r   r   r   rq   P  s
   rq   c               @   sL   e Zd ZdZdd Zedd Zdd Zdd	 Zd
d Z	dd Z
dd ZdS )r   )r`   async_watcher_call_when_readyrh   contextr%   receiverc             C   sB   || _ || _d | _d | _d| _|j | _|| _| j	| j
 d S )Nr   )rw   r%   rv   rh   r`   r+   Zasync_rt   ru   rG   	_on_async)r   rw   r%   Zcall_when_readyr   r   r   r   f  s    zThreadResult.__init__c             C   s   | j r| j d S d S )NrL   )r`   )r   r   r   r   	exceptionp  s    zThreadResult.exceptionc          	   C   s   | j   | j   |   zB| jr:| jj| jf| j  d | _t| _ d | _t| _| 	|  W d t| _	d | _
| jr| jd | jd d f| _X d S )Nr   rL   )rt   rI   r/   ru   r`   r%   rd   rv   rq   rw   rh   )r   r   r   r   rx   t  s    

zThreadResult._on_asyncc             C   s6   | j   | j   t| _ d | _d | _t| _t| _d S )N)rt   rI   r/   rq   rv   r%   ru   rw   )r   r   r   r   rV     s    

zThreadResult.destroyc             C   s   || _ | j  d S )N)rh   rt   rr   )r   rh   r   r   r   re     s    zThreadResult.setc             C   s   || _ || _| j  d S )N)rv   r`   rt   rr   )r   rv   r`   r   r   r   rd     s    zThreadResult.handle_errorc             C   s
   | j d kS )N)ry   )r   r   r   r   
successful  s    zThreadResult.successfulN)r!   r"   r#   	__slots__r   rp   ry   rx   rV   re   rd   rz   r   r   r   r   r   `  s   

c          
   C   s:   yd|||fS  | k
r4 } zd|fS d}~X Y nX dS )zM
    .. deprecated:: 1.1a2
       Previously used by ThreadPool.apply_e.
    TFNr   )errorsri   rX   rY   exr   r   r   wrap_errors  s    r~   ThreadPoolExecutor)Timeout)Lazy)_basec                s    fdd}d|_ |S )Nc                sB   ~ y  W n. t k
r<   jj fft   Y nX d S )N)	Exceptionr%   print_exceptionr\   r`   )_)fnfuturer   r   cbwrap  s
    z_wrap_error.<locals>.cbwrapT)auto_unlink)r   r   r   r   )r   r   r   _wrap_error  s    
r   c                s    fdd}d|_ |S )Nc                s     d S )Nr   )r   )r   r   r   r   f  s    z_wrap.<locals>.fT)r   )r   r   r   r   )r   r   r   _wrap  s    r   c               @   s   e Zd Zdd Zedd Zedd Zdd Zd	e_e	d
d Z
dd ZdddZdddZdd Zdd Zdd Zdd ZdS )_FutureProxyc             C   s
   || _ d S )N)asyncresult)r   r   r   r   r   r     s    z_FutureProxy.__init__c             C   s:   ddl m} |ds|  r.dd l}| S tdd S )Nr   )monkey	threading
_condition)Zgeventr   Zis_module_patcheddoner   Z	ConditionAttributeError)r   r   r   r   r   r   r     s
    z_FutureProxy._conditionc             C   s   | j | j g S )N)r   rawlink_FutureProxy__when_done)r   r   r   r   _waiters  s    z_FutureProxy._waitersc             C   s:   t | d}x*|D ]"}|  r(||  q||  qW d S )Nr   )r]   rz   Z
add_resultZadd_exception)r   r   waiterswr   r   r   Z__when_done  s
    

z_FutureProxy.__when_doneTc             C   s   |   rtjS tjS )N)r   cfbZFINISHEDZRUNNING)r   r   r   r   _state  s    z_FutureProxy._statec             C   s   d S )Nr   )r   r   r   r   set_running_or_notify_cancel  s    z)_FutureProxy.set_running_or_notify_cancelNc             C   s2   y| j j|dS  tk
r,   tj Y nX d S )N)timeout)r   r[   GTimeout
concurrentfuturesTimeoutError)r   r   r   r   r   r[     s    z_FutureProxy.resultc             C   s:   y| j j|d | j jS  tk
r4   tj Y nX d S )N)r   )r   rc   ry   r   r   r   r   )r   r   r   r   r   ry     s
    z_FutureProxy.exceptionc             C   s(   |   r||  n| jt| | d S )N)r   r   r   r   )r   r   r   r   r   add_done_callback	  s    
z_FutureProxy.add_done_callbackc             C   s   | j t| | d S )N)r   r   r   )r   r   r   r   r   r     s    z_FutureProxy.rawlinkc             C   s
   t | jS )N)strr   )r   r   r   r   __str__  s    z_FutureProxy.__str__c             C   s   t | j|S )N)r]   r   )r   ra   r   r   r   __getattr__  s    z_FutureProxy.__getattr__)N)N)r!   r"   r#   r   r   r   r   r   r   rp   r   r   r[   ry   r   r   r   r   r   r   r   r   r     s   


r   c                   sB   e Zd ZdZ fddZdd Zd fdd	ZeZd	d
 Z  Z	S )r   a  
        A version of :class:`concurrent.futures.ThreadPoolExecutor` that
        always uses native threads, even when threading is monkey-patched.

        The ``Future`` objects returned from this object can be used
        with gevent waiting primitives like :func:`gevent.wait`.

        .. caution:: If threading is *not* monkey-patched, then the ``Future``
           objects returned by this object are not guaranteed to work with
           :func:`~concurrent.futures.as_completed` and :func:`~concurrent.futures.wait`.
           The individual blocking methods like :meth:`~concurrent.futures.Future.result`
           and :meth:`~concurrent.futures.Future.exception` will always work.

        .. versionadded:: 1.2a1
           This is a provisional API.
        c                s&   t t| | t|| _d| j_d S )NT)superr   r   r   _threadpoolrf   )r   Zmax_workers)r9   r   r   r   *  s    
zThreadPoolExecutor.__init__c          	   O   s<   | j , | jrtd| jj|f||}t|S Q R X d S )Nz*cannot schedule new futures after shutdown)Z_shutdown_lock	_shutdownRuntimeErrorr   rR   r   )r   r   rX   rY   r   r   r   r   submit/  s
    zThreadPoolExecutor.submitTc                s6   t t| | t| jdd }|r,| j  d | _d S )NrB   )r   r   shutdownr]   r   rB   )r   waitrB   )r9   r   r   r   7  s
    
zThreadPoolExecutor.shutdownc             C   s   d S )Nr   )r   r   r   r   _adjust_thread_countA  s    z'ThreadPoolExecutor._adjust_thread_count)T)
r!   r"   r#   ro   r   r   r   rB   r   __classcell__r   r   )r9   r   r     s   )5Z
__future__r   r\   r(   weakrefr   r   r   r   Zgevent._compatr   Z
gevent.hubr   r$   r   r   r	   Zgevent.eventr
   Zgevent.greenletr   Zgevent.poolr   Zgevent.lockr   Zgevent._threadingr   r   r   r   __all__r   r   objectrq   r   r~   Zconcurrent.futuresr   ImportErrorappendZgevent.timeoutr   r   Zgevent._utilr   r   r   r   r   r   r   r   r   r   r   r   <module>   sN     D

K