ó
ßüÚ\c           @` sÃ   d  d l  m Z m Z m Z d  d l Z d  d l m Z d d l m Z m	 Z	 d d l
 m Z m Z d d l m Z d d l m Z e j e ƒ Z d	 e f d
 „  ƒ  YZ d e f d „  ƒ  YZ d S(   i    (   t   print_functiont   divisiont   absolute_importN(   t   geni   (   t
   futures_oft   wait(   t   synct   tokey(   t	   pack_data(   t   _deserializet   ReplayExceptionSchedulerc           B` s    e  Z d  Z d „  Z d „  Z RS(   s    A plugin for the scheduler to recreate exceptions locally

    This adds the following routes to the scheduler

    *  cause_of_failure
    c         C` s0   | |  _  |  j |  j  j d <|  |  j  j d <d  S(   Nt   cause_of_failuret
   exceptions(   t	   schedulerR   t   handlerst
   extensions(   t   selfR   (    (    s>   lib/python2.7/site-packages/distributed/recreate_exceptions.pyt   __init__   s    	c         O` sÂ   | j  d g  ƒ } x© | D]¡ } t | t ƒ r= t | ƒ } n  t | ƒ } |  j j j | ƒ } | d k	 r | j	 d k	 r | j	 } i g  | j
 D] } | j ^ q d 6| j d 6| j d 6Sq Wd S(   sX  
        Return details of first failed task required by set of keys

        Parameters
        ----------
        keys: list of keys known to the scheduler

        Returns
        -------
        Dictionary with:
        cause: the key that failed
        task: the definition of that key
        deps: keys that the task depends on
        t   keyst   depst   causet   taskN(   t   popt
   isinstancet   listt   tupleR   R   t   taskst   gett   Nonet   exception_blamet   dependenciest   keyt   run_spec(   R   t   argst   kwargsR   R   t   tsR   t   dts(    (    s>   lib/python2.7/site-packages/distributed/recreate_exceptions.pyR      s    	 
(   t   __name__t
   __module__t   __doc__R   R   (    (    (    s>   lib/python2.7/site-packages/distributed/recreate_exceptions.pyR
      s   	t   ReplayExceptionClientc           B` s\   e  Z d  Z d „  Z e d „  ƒ Z e j d „  ƒ Z d „  Z	 e j d „  ƒ Z
 d „  Z RS(   sP  
    A plugin for the client allowing replay of remote exceptions locally

    Adds the following methods (and their async variants)to the given client:

    - ``recreate_error_locally``: main user method
    - ``get_futures_error``: gets the task, its details and dependencies,
        responsible for failure of the given future.
    c         C` sY   | |  _  |  |  j  j d <|  j |  j  _ |  j |  j  _ |  j |  j  _ |  j |  j  _ d  S(   NR   (   t   clientR   t   recreate_error_locallyt   _recreate_error_locallyt   _get_futures_errort   get_futures_error(   R   R)   (    (    s>   lib/python2.7/site-packages/distributed/recreate_exceptions.pyR   E   s    	c         C` s
   |  j  j S(   N(   R)   R   (   R   (    (    s>   lib/python2.7/site-packages/distributed/recreate_exceptions.pyR   N   s    c   
      c` sı   g  t  | ƒ D] } | j d k r | ^ q } | sC t d ƒ ‚ n  |  j j d g  | D] } | j ^ qV ƒ V} | d | d } } t | t ƒ rÆ t |   \ } } }	 t	 j
 | | |	 | f ƒ ‚ n3 t d | ƒ \ } } }	 t	 j
 | | |	 | f ƒ ‚ d  S(   Nt   errors   No errored futures passedR   R   R   (   R   t   statust
   ValueErrorR   R   R   R   t   dictR	   R   t   Return(
   R   t   futuret   ft   futurest   outR   R   t   functionR!   R"   (    (    s>   lib/python2.7/site-packages/distributed/recreate_exceptions.pyR,   R   s    .,c         C` s   |  j  j |  j | ƒ S(   sô  
        Ask the scheduler details of the sub-task of the given failed future

        When a future evaluates to a status of "error", i.e., an exception
        was raised in a task within its graph, we an get information from
        the scheduler. This function gets the details of the specific task
        that raised the exception and led to the error, but does not fetch
        data from the cluster or execute the function.

        Parameters
        ----------
        future : future that failed, having ``status=="error"``, typically
            after an attempt to ``gather()`` shows a stack-stace.

        Returns
        -------

        Tuple:
        - the function that raised an exception
        - argument list (a tuple), may include values and keys
        - keyword arguments (a dictionary), may include values and keys
        - list of keys that the function requires to be fetched to run

        See Also
        --------
        ReplayExceptionClient.recreate_error_locally
        (   R)   R   R,   (   R   R3   (    (    s>   lib/python2.7/site-packages/distributed/recreate_exceptions.pyR-   a   s    c   	      c` s   t  | ƒ V|  j | ƒ V} | \ } } } } |  j j i  | ƒ } |  j j | ƒ V} t | | ƒ } t | | ƒ } t j | | | f ƒ ‚ d  S(   N(   R   R,   R)   t   _graph_to_futurest   _gatherR   R   R2   (	   R   R3   R6   R7   R!   R"   R   R5   t   data(    (    s>   lib/python2.7/site-packages/distributed/recreate_exceptions.pyR+      s    c         C` s5   t  |  j j |  j | ƒ \ } } } | | |   d S(   s“  
        For a failed calculation, perform the blamed task locally for debugging.

        This operation should be performed after a future (result of ``gather``,
        ``compute``, etc) comes back with a status of "error", if the stack-
        trace is not informative enough to diagnose the problem. The specific
        task (part of the graph pointing to the future) responsible for the
        error will be fetched from the scheduler, together with the values of
        its inputs. The function will then be executed, so that ``pdb`` can
        be used for debugging.

        Examples
        --------
        >>> future = c.submit(div, 1, 0)         # doctest: +SKIP
        >>> future.status                        # doctest: +SKIP
        'error'
        >>> c.recreate_error_locally(future)     # doctest: +SKIP
        ZeroDivisionError: division by zero

        If you're in IPython you might take this opportunity to use pdb

        >>> %pdb                                 # doctest: +SKIP
        Automatic pdb calling has been turned ON

        >>> c.recreate_error_locally(future)     # doctest: +SKIP
        ZeroDivisionError: division by zero
              1 def div(x, y):
        ----> 2     return x / y
        ipdb>

        Parameters
        ----------
        future : future or collection that failed
            The same thing as was given to ``gather``, but came back with
            an exception/stack-trace. Can also be a (persisted) dask collection
            containing any errored futures.

        Returns
        -------
        Nothing; the function runs and should raise an exception, allowing
        the debugger to run.
        N(   R   R)   t   loopR+   (   R   R3   t   funcR!   R"   (    (    s>   lib/python2.7/site-packages/distributed/recreate_exceptions.pyR*   Š   s    +!(   R%   R&   R'   R   t   propertyR   R   t	   coroutineR,   R-   R+   R*   (    (    (    s>   lib/python2.7/site-packages/distributed/recreate_exceptions.pyR(   :   s   				(   t
   __future__R    R   R   t   loggingt   tornadoR   R)   R   R   t   utilsR   R   t
   utils_commR   t   workerR	   t	   getLoggerR%   t   loggert   objectR
   R(   (    (    (    s>   lib/python2.7/site-packages/distributed/recreate_exceptions.pyt   <module>   s   -