σ
ίόΪ\c           @` sΔ   d  d l  m Z m Z m Z d  d l j Z d  d l Z d  d l Z d  d l	 m
 Z
 d  d l m Z d d l m Z d d l m Z e j d    Z e j d	    Z d
 e j f d     YZ d S(   i    (   t   print_functiont   divisiont   absolute_importN(   t   merge(   t   geni   (   t   time(   t   syncc         c` s   |  j  d t  V} |  j } | d k r8 | j |  nW | d k r[ | j   | j   n4 y t j |   Wn  t k
 r } | j	 |  n Xd S(   s[   
    Coroutine that waits on Dask future, then transmits its outcome to
    cf_future.
    t   raiseitt   finishedt	   cancelledN(
   t   _resultt   Falset   statust
   set_resultt   cancelt   set_running_or_notify_cancelt   sixt   reraiset   BaseExceptiont   set_exception(   t   futuret	   cf_futuret   resultR   t   exc(    (    s5   lib/python2.7/site-packages/distributed/cfexecutor.pyt   _cascade_future   s    	
c         c` s2   x+ |  D]# } y	 | VWq t  k
 r) q Xq Wd  S(   N(   t	   Exception(   t   futurest   fut(    (    s5   lib/python2.7/site-packages/distributed/cfexecutor.pyt   _wait_on_futures%   s
    	t   ClientExecutorc           B` sY   e  Z d  Z e d d d d d g  Z d   Z d   Z d   Z d	   Z e	 d
  Z
 RS(   sY   
    A concurrent.futures Executor that executes tasks on a dask.distributed Client.
    t   puret   workerst	   resourcest   allow_other_workerst   retriesc         K` si   t  |  } | |  j k s; t d t | |  j    n  | |  _ t j   |  _ t |  _	 | |  _
 d  S(   Ns+   unsupported arguments to ClientExecutor: %s(   t   sett   _allowed_kwargst	   TypeErrort   sortedt   _clientt   weakreft   WeakSett   _futuresR   t	   _shutdownt   _kwargs(   t   selft   clientt   kwargst   sk(    (    s5   lib/python2.7/site-packages/distributed/cfexecutor.pyt   __init__7   s    		c         ` sE   t  j   }   f d   } | j |  |  j j j t   |  | S(   sK   
        Wrap a distributed Future in a concurrent.futures Future.
        c         ` s,   |  j    r(   j d k r(   j   n  d  S(   NR	   (   R	   R   R   (   R   (   R   (    s5   lib/python2.7/site-packages/distributed/cfexecutor.pyt   cf_callbackJ   s    (   t   cft   Futuret   add_done_callbackR'   t   loopt   add_callbackR   (   R-   R   R   R2   (    (   R   s5   lib/python2.7/site-packages/distributed/cfexecutor.pyt   _wrap_futureC   s
    c         O` sY   |  j  r t d   n  |  j j | | t |  j |   } |  j j |  |  j |  S(   s/  Submits a callable to be executed with the given arguments.

        Schedules the callable to be executed as ``fn(*args, **kwargs)``
        and returns a Future instance representing the execution of the callable.

        Returns
        -------
        A Future representing the given call.
        s*   cannot schedule new futures after shutdown(	   R+   t   RuntimeErrorR'   t   submitR   R,   R*   t   addR8   (   R-   t   fnt   argsR/   R   (    (    s5   lib/python2.7/site-packages/distributed/cfexecutor.pyR:   S   s
    
	$c         ` s   | j  d d    d k	 r.  t     n  d | k rD | d =n  | rc t d t |    n   j j | |  j        f d   } |   S(   sχ  Returns an iterator equivalent to ``map(fn, *iterables)``.

        Parameters
        ----------
        fn: A callable that will take as many arguments as there are
            passed iterables.
        iterables: One iterable for each parameter to *fn*.
        timeout: The maximum number of seconds to wait. If None, then there
            is no limit on the wait time.
        chunksize: ignored.

        Returns
        -------
        An iterator equivalent to: ``map(fn, *iterables)`` but the calls may
        be evaluated out-of-order.

        Raises
        ------
        TimeoutError: If the entire result iterator could not be generated
            before the given timeout.
        Exception: If ``fn(*args)`` raises for any values.
        t   timeoutt	   chunksizes!   unexpected arguments to map(): %sc          3` s½   zx xq  D]i }   j  j |    d  k	 rh y |  j   t    VWqs t j k
 rd t j  qs Xq
 |  j   Vq
 WWd  t   } x | D] }   j  j |   q W j	 j
 |  Xd  S(   N(   R*   R;   t   NoneR   R   R   t   TimeoutErrorR3   t   listR'   R   (   R   t	   remaining(   t   end_timet   fsR-   R>   (    s5   lib/python2.7/site-packages/distributed/cfexecutor.pyt   result_iterator   s    N(   t   popR@   R   R%   R&   R'   t   mapR,   (   R-   R<   t	   iterablesR/   RF   (    (   RD   RE   R-   R>   s5   lib/python2.7/site-packages/distributed/cfexecutor.pyRH   c   s    
c         C` sW   |  j  sS t |  _  t |  j  } | r@ t |  j j t |  qS |  j j |  n  d S(   s  Clean-up the resources associated with the Executor.

        It is safe to call this method several times. Otherwise, no other
        methods can be called after this one.

        Parameters
        ----------
        wait: If True then shutdown will not return until all running
            futures have finished executing.  If False then all running
            futures are cancelled immediately.
        N(	   R+   t   TrueRB   R*   R   R'   R6   R   R   (   R-   t   waitRE   (    (    s5   lib/python2.7/site-packages/distributed/cfexecutor.pyt   shutdown   s    		(   t   __name__t
   __module__t   __doc__t	   frozensetR$   R1   R8   R:   RH   RJ   RL   (    (    (    s5   lib/python2.7/site-packages/distributed/cfexecutor.pyR   .   s   				6(   t
   __future__R    R   R   t   concurrent.futuresR   R3   R(   R   t   toolzR   t   tornadoR   t   metricsR   t   utilsR   t	   coroutineR   R   t   ExecutorR   (    (    (    s5   lib/python2.7/site-packages/distributed/cfexecutor.pyt   <module>   s   	