B
    °F.\<  ã               @   s’   d dl mZmZmZ d dlmZ d dlZd dlZd dl	m
Z
 d dlmZ ddlmZ ddlmZ ejdd	„ ƒZejd
d„ ƒZG dd„ dejƒZdS )é    )Úprint_functionÚdivisionÚabsolute_importN)Úmerge)Úgené   )Útime)Úsyncc          
   c   s‚   | j ddV }| j}|dkr(| |¡ nV|dkrB| ¡  | ¡  n<ytj|Ž  W n, tk
r| } z| |¡ W dd}~X Y nX dS )z[
    Coroutine that waits on Dask future, then transmits its outcome to
    cf_future.
    F)ZraiseitZfinishedÚ	cancelledN)	Z_resultÚstatusZ
set_resultÚcancelZset_running_or_notify_cancelÚsixZreraiseÚBaseExceptionZset_exception)ÚfutureÚ	cf_futureÚresultr   Úexc© r   ú5lib/python3.7/site-packages/distributed/cfexecutor.pyÚ_cascade_future   s    
r   c          	   c   s2   x,| D ]$}y
|V  W q t k
r(   Y qX qW d S )N)Ú	Exception)ÚfuturesZfutr   r   r   Ú_wait_on_futures%   s
    

r   c               @   sL   e Zd ZdZedddddgƒZdd„ Zd	d
„ Zdd„ Zdd„ Z	ddd„Z
dS )ÚClientExecutorzY
    A concurrent.futures Executor that executes tasks on a dask.distributed Client.
    ZpureZworkersZ	resourcesZallow_other_workersZretriesc             K   sH   t |ƒ}|| jks(tdt|| j ƒ ƒ‚|| _t ¡ | _d| _|| _	d S )Nz+unsupported arguments to ClientExecutor: %sF)
ÚsetÚ_allowed_kwargsÚ	TypeErrorÚsortedÚ_clientÚweakrefÚWeakSetÚ_futuresÚ	_shutdownÚ_kwargs)ÚselfZclientÚkwargsZskr   r   r   Ú__init__5   s    

zClientExecutor.__init__c                s4   t  ¡ }‡ fdd„}| |¡ | jj tˆ |¡ |S )zK
        Wrap a distributed Future in a concurrent.futures Future.
        c                s   |   ¡ rˆ jdkrˆ  ¡  d S )Nr
   )r
   r   r   )r   )r   r   r   Úcf_callbackF   s    z0ClientExecutor._wrap_future.<locals>.cf_callback)ÚcfZFutureZadd_done_callbackr   ÚloopZadd_callbackr   )r$   r   r   r'   r   )r   r   Ú_wrap_future?   s
    
zClientExecutor._wrap_futurec             O   s@   | j rtdƒ‚| jj|f|žt| j|ƒŽ}| j |¡ |  |¡S )a/  Submits a callable to be executed with the given arguments.

        Schedules the callable to be executed as ``fn(*args, **kwargs)``
        and returns a Future instance representing the execution of the callable.

        Returns
        -------
        A Future representing the given call.
        z*cannot schedule new futures after shutdown)	r"   ÚRuntimeErrorr   Úsubmitr   r#   r!   Úaddr*   )r$   ÚfnÚargsr%   r   r   r   r   r,   O   s
    
zClientExecutor.submitc                sn   |  dd¡‰ˆdk	rˆtƒ  ‰ d|kr,|d= |r@tdt|ƒ ƒ‚ˆjj|f|žˆjŽ‰‡ ‡‡‡fdd„}|ƒ S )a÷  Returns an iterator equivalent to ``map(fn, *iterables)``.

        Parameters
        ----------
        fn: A callable that will take as many arguments as there are
            passed iterables.
        iterables: One iterable for each parameter to *fn*.
        timeout: The maximum number of seconds to wait. If None, then there
            is no limit on the wait time.
        chunksize: ignored.

        Returns
        -------
        An iterator equivalent to: ``map(fn, *iterables)`` but the calls may
        be evaluated out-of-order.

        Raises
        ------
        TimeoutError: If the entire result iterator could not be generated
            before the given timeout.
        Exception: If ``fn(*args)`` raises for any values.
        ÚtimeoutNZ	chunksizez!unexpected arguments to map(): %sc           
   3   sœ   zfx`ˆD ]X} ˆj  | ¡ ˆd k	rVy|  ˆ tƒ  ¡V  W q` tjk
rR   tj‚Y q`X q|  ¡ V  qW W d tˆƒ}x|D ]} ˆj  | ¡ qvW ˆj 	|¡ X d S )N)
r!   r-   r   r   r   ÚTimeoutErrorr(   Úlistr   r   )r   Z	remaining)Úend_timeÚfsr$   r0   r   r   Úresult_iteratorƒ   s    

z+ClientExecutor.map.<locals>.result_iterator)Úpopr   r   r   r   Úmapr#   )r$   r.   Ú	iterablesr%   r5   r   )r3   r4   r$   r0   r   r7   _   s    
zClientExecutor.mapTc             C   s<   | j s8d| _ t| jƒ}|r,t| jjt|ƒ n| j |¡ dS )aš  Clean-up the resources associated with the Executor.

        It is safe to call this method several times. Otherwise, no other
        methods can be called after this one.

        Parameters
        ----------
        wait: If True then shutdown will not return until all running
            futures have finished executing.  If False then all running
            futures are cancelled immediately.
        TN)r"   r2   r!   r	   r   r)   r   r   )r$   Úwaitr4   r   r   r   Úshutdown–   s    
zClientExecutor.shutdownN)T)Ú__name__Ú
__module__Ú__qualname__Ú__doc__Ú	frozensetr   r&   r*   r,   r7   r:   r   r   r   r   r   .   s   
7r   )Z
__future__r   r   r   Zconcurrent.futuresr   r(   r   r   Ztoolzr   Ztornador   Zmetricsr   Zutilsr	   Ú	coroutiner   r   ZExecutorr   r   r   r   r   Ú<module>   s   	