ó
ßüÚ\c           @` sF  d  d l  m Z m Z m Z d  d l m Z d  d l m Z d  d l Z d  d l	 m
 Z
 d  d l m Z d  d l m Z d  d l m Z m Z m Z m Z d	 d
 l m Z d	 d l m Z m Z e
 j e d d d   Z d e f d     YZ d  g Z e
 j e e d d   Z  e! e" e# e$ f Z% e& d d  Z' e d  Z( d S(   i    (   t   print_functiont   divisiont   absolute_import(   t   defaultdict(   t   cycleN(   t   gen(   t   Return(   t   SubgraphCallable(   t   merget   concatt   groupbyt   dropi   (   t   rpc(   t   Allt   tokeyc         #` sX  d d l  m   t   } t   } |   d   |  j   D }  t   } t   } xĐt |  t |  t |   k  r"t t  }	 t   }
 t   } x |  j   D]w \ } } | | k rž q  n  y8 t j	 t | |   } |	 | j
 |  | |
 | <Wq  t k
 r| j |  q  Xq  W| r.| | O} n   f d   |	 D } z      f d   |	 j   D } i   xW | j   D]I \ } } y | V} Wn t k
 rś| j |  qX j | d  qWWd x | j   D] } | j   qÝWX|  f d   |
 j   D O} | j   qS W f d	   | D } t | | t |  f   d S(
   s   Gather data directly from peers

    Parameters
    ----------
    who_has: dict
        Dict mapping keys to sets of workers that may have that key
    rpc: callable

    Returns dict mapping key to value

    See Also
    --------
    gather
    _gather
    i   (   t   get_data_from_workerc         S` s%   i  |  ] \ } } t  |  |  q S(    (   t   set(   t   .0t   kt   v(    (    s5   lib/python2.7/site-packages/distributed/utils_comm.pys
   <dictcomp>'   s   	 c         ` s   i  |  ] }   |  |  q S(    (    (   R   t   addr(   R   (    s5   lib/python2.7/site-packages/distributed/utils_comm.pys
   <dictcomp>;   s   	 c         ` s=   i  |  ]3 \ } }    | | d   d  d t  |  q S(   t   whot   serializerst   max_connections(   t   False(   R   t   addresst   keys(   R   R   R   R   (    s5   lib/python2.7/site-packages/distributed/utils_comm.pys
   <dictcomp>>   s   	t   dataNc         ` s(   h  |  ] \ } } |   k r |  q S(    (    (   R   R   R   (   t   response(    s5   lib/python2.7/site-packages/distributed/utils_comm.pys	   <setcomp>T   s   	 c         ` s#   i  |  ] } t    |  |  q S(    (   t   list(   R   R   (   t   original_who_has(    s5   lib/python2.7/site-packages/distributed/utils_comm.pys
   <dictcomp>W   s   	 (   t   workerR   R   t   itemst   dictt   lenR   R   t   randomt   choicet   appendt
   IndexErrort   addt   EnvironmentErrort   updatet   valuest	   close_rpcR   (   t   who_hasR   t   closeR   R   t   bad_addressest   missing_workerst   resultst   all_bad_keyst   dt   revt   bad_keyst   keyt	   addressesR   t   rpcst
   coroutinesR   t   ct   r(    (   R   R   R   R   R   R   s5   lib/python2.7/site-packages/distributed/utils_comm.pyt   gather_from_workers   sN    				%		 t
   WrappedKeyc           B` s    e  Z d  Z d   Z d   Z RS(   s   Interface for a key in a dask graph.

    Subclasses must have .key attribute that refers to a key in a dask graph.

    Sometimes we want to associate metadata to keys in a dask graph.  For
    example we might know that that key lives on a particular machine or can
    only be accessed in a certain way.  Schedulers may have particular needs
    that can only be addressed by additional metadata.
    c         C` s   | |  _  d  S(   N(   R5   (   t   selfR5   (    (    s5   lib/python2.7/site-packages/distributed/utils_comm.pyt   __init__f   s    c         C` s   d t  |   j |  j f S(   Ns   %s('%s')(   t   typet   __name__R5   (   R=   (    (    s5   lib/python2.7/site-packages/distributed/utils_comm.pyt   __repr__i   s    (   R@   t
   __module__t   __doc__R>   RA   (    (    (    s5   lib/python2.7/site-packages/distributed/utils_comm.pyR<   [   s   		c      
   #` sľ  t  |  t  s t  t  | t  s* t  t t d   |  j   D   } t t | j      \ } } t t d t	 |  t
 |   } t d c t	 |  7<t t | | |   } t d |  }	 d   |	 j   D }	   f d   |	 D }
 zL t g  |	 j   D]. \ } } |
 | j d | d | d |  ^ q	 V} Wd x |
 j   D] } | j   qRWXt d	   | D  } d
   t d |  j   D } t | | | f   d S(   s   Scatter data directly to workers

    This distributes data in a round-robin fashion to a set of workers based on
    how many cores they have.  ncores should be a dictionary mapping worker
    identities to numbers of cores.

    See scatter for parameter docstring
    c         s` s"   |  ] \ } } | g | Vq d  S(   N(    (   R   t   wt   nc(    (    s5   lib/python2.7/site-packages/distributed/utils_comm.pys	   <genexpr>}   s    i    c         S` s)   i  |  ] \ } } d    | D |  q S(   c         S` s"   i  |  ] \ } } } | |  q S(    (    (   R   t   _R5   t   value(    (    s5   lib/python2.7/site-packages/distributed/utils_comm.pys
   <dictcomp>   s   	 (    (   R   R   R   (    (    s5   lib/python2.7/site-packages/distributed/utils_comm.pys
   <dictcomp>   s   	 c         ` s   i  |  ] }   |  |  q S(    (    (   R   R   (   R   (    s5   lib/python2.7/site-packages/distributed/utils_comm.pys
   <dictcomp>   s   	 R   t   reportR   Nc         s` s   |  ] } | d  Vq d S(   t   nbytesN(    (   R   t   o(    (    s5   lib/python2.7/site-packages/distributed/utils_comm.pys	   <genexpr>   s    c         S` s;   i  |  ]1 \ } } g  | D] \ } } } | ^ q |  q S(    (    (   R   R   R   RD   RF   (    (    s5   lib/python2.7/site-packages/distributed/utils_comm.pys
   <dictcomp>   s   	 i   (   t
   isinstanceR!   t   AssertionErrorR   R	   R    t   zipR   t   _round_robin_counterR"   R   R
   R   t   update_dataR*   R+   R   R   (   t   ncoresR   R   RH   R   t   workerst   namest   worker_itert   LR2   R7   R   R   t   outR:   RI   R,   (    (   R   s5   lib/python2.7/site-packages/distributed/utils_comm.pyt   scatter_to_workersp   s&    
"#Fc         ` sF   d k r1 t    t |      } |  f St |   } | t k r^|  sS |  St |  d  t k r>|  d } t       f d   | j j   D } t    f d   |  d D  }  r7 j     rđ t d    D  n t d    D   | j	  } t | | j
 | | j  f |  S|  Sq^t    f d   |  D  Sn  | t k rŁ|  st|  Sg  |  D] }	 t |	     ^ q{}
 | |
  S| t k r |  růg  |  j   D] } t |     ^ qÂ} t t |  j   |   S|  SnB t | t  r>|  j }   r-t |  } n   j |   | S|  Sd S(	   ső   Unpack WrappedKey objects from collection

    Returns original collection and set of all found WrappedKey objects

    Examples
    --------
    >>> rd = WrappedKey('mykey')
    >>> unpack_remotedata(1)
    (1, set())
    >>> unpack_remotedata(())
    ((), set())
    >>> unpack_remotedata(rd)
    ('mykey', {WrappedKey('mykey')})
    >>> unpack_remotedata([1, rd])
    ([1, 'mykey'], {WrappedKey('mykey')})
    >>> unpack_remotedata({1: rd})
    ({1: 'mykey'}, {WrappedKey('mykey')})
    >>> unpack_remotedata({1: [rd]})
    ({1: ['mykey']}, {WrappedKey('mykey')})

    Use the ``byte_keys=True`` keyword to force string keys

    >>> rd = WrappedKey(('x', 1))
    >>> unpack_remotedata(rd, byte_keys=True)
    ("('x', 1)", {WrappedKey('('x', 1)')})
    i    c         ` s+   i  |  ]! \ } } t  |     |  q S(    (   t   unpack_remotedata(   R   R   R   (   t	   byte_keyst   futures(    s5   lib/python2.7/site-packages/distributed/utils_comm.pys
   <dictcomp>Č   s   	 c         3` s!   |  ] } t  |     Vq d  S(   N(   RW   (   R   t   i(   RX   RY   (    s5   lib/python2.7/site-packages/distributed/utils_comm.pys	   <genexpr>Ę   s    i   c         s` s   |  ] } t  | j  Vq d  S(   N(   R   R5   (   R   t   f(    (    s5   lib/python2.7/site-packages/distributed/utils_comm.pys	   <genexpr>Î   s    c         s` s   |  ] } | j  Vq d  S(   N(   R5   (   R   R[   (    (    s5   lib/python2.7/site-packages/distributed/utils_comm.pys	   <genexpr>Đ   s    c         3` s!   |  ] } t  |     Vq d  S(   N(   RW   (   R   t   item(   RX   t   myset(    s5   lib/python2.7/site-packages/distributed/utils_comm.pys	   <genexpr>Ű   s    N(   t   NoneR   RW   R?   t   tupleR   t   dskR    R)   t   inkeyst   outkeyt   namet   collection_typesR!   R*   RM   R   t
   issubclassR<   R5   R   R'   (   RJ   RX   R]   RU   t   typt   scR`   t   argsRa   R\   t   outsR   R*   R   (    (   RX   RY   R]   s5   lib/python2.7/site-packages/distributed/utils_comm.pyRW      sL    	

	"#$ %
+	c         ` s°   t  |   } y' t |    r2 |    k r2   |  SWn t k
 rF n X| t k r | g  |  D] } t |   d  ^ q]  S| t k r¨    f d   |  j   D S|  Sd S(   sË   Merge known data into tuple or dict

    Parameters
    ----------
    o:
        core data structures containing literals and keys
    d: dict
        mapping of keys to data

    Examples
    --------
    >>> data = {'x': 1}
    >>> pack_data(('x', 'y'), data)
    (1, 'y')
    >>> pack_data({'a': 'x', 'b': 'y'}, data)  # doctest: +SKIP
    {'a': 1, 'b': 'y'}
    >>> pack_data({'a': ['x'], 'b': 'y'}, data)  # doctest: +SKIP
    {'a': [1], 'b': 'y'}
    t	   key_typesc         ` s.   i  |  ]$ \ } } t  |   d   |  q S(   Rj   (   t	   pack_data(   R   R   R   (   R2   Rj   (    s5   lib/python2.7/site-packages/distributed/utils_comm.pys
   <dictcomp>  s   	 N(   R?   RK   t	   TypeErrorRd   Rk   R!   R    (   RJ   R2   Rj   Rf   t   x(    (   R2   Rj   s5   lib/python2.7/site-packages/distributed/utils_comm.pyRk   ń   s    ,()   t
   __future__R    R   R   t   collectionsR   t	   itertoolsR   R#   t   tornadoR   t   tornado.genR   t   dask.optimizationR   t   toolzR   R	   R
   R   t   coreR   t   utilsR   R   t	   coroutinet   TrueR^   R;   t   objectR<   RN   RV   R_   R   R   t	   frozensetRd   R   RW   Rk   (    (    (    s5   lib/python2.7/site-packages/distributed/utils_comm.pyt   <module>   s$   "I	+R