B
    ^\                  @   s   d dl mZmZmZ d dlmZ d dlmZ d dlZd dl	m
Z
 d dlmZ d dlmZ d dlmZmZmZmZ d	d
lmZ d	dlmZmZ e
jdddZG dd deZd gZe
jeddfddZee e!e"fZ#dddZ$efddZ%dS )    )print_functiondivisionabsolute_import)defaultdict)cycleN)gen)Return)SubgraphCallable)mergeconcatgroupbydrop   )rpc)AlltokeyTc          	   #   s  ddl m  t }t }| dd |  D } t }t }xvt|t| t| k rtt}	t }
t }xj|  D ]^\}}||krqvy,t	t|| }|	| 
| ||
|< W qv tk
r   || Y qvX qvW |r||O }fdd|	D }zv fdd|	 D }i xP| D ]D\}}y
|V }W n  tk
rT   || Y nX |d  q"W W dx| D ]}|  qxW X |fd	d
|
 D O }| q>W fdd|D }t||t|fdS )a   Gather data directly from peers

    Parameters
    ----------
    who_has: dict
        Dict mapping keys to sets of workers that may have that key
    rpc: callable

    Returns dict mapping key to value

    See Also
    --------
    gather
    _gather
    r   )get_data_from_workerc             S   s   i | ]\}}t ||qS  )set).0kvr   r   5lib/python3.7/site-packages/distributed/utils_comm.py
<dictcomp>&   s    z'gather_from_workers.<locals>.<dictcomp>c                s   i | ]} ||qS r   r   )r   addr)r   r   r   r   :   s    c          
      s&   i | ]\}} ||d d|qS )F)whoserializersZmax_connectionsr   )r   addresskeys)r   r   r   r   r   r   r   <   s   dataNc                s   h | ]\}}| kr|qS r   r   )r   r   r   )responser   r   	<setcomp>M   s    z&gather_from_workers.<locals>.<setcomp>c                s   i | ]}t  | |qS r   )list)r   r   )original_who_hasr   r   r   P   s    )workerr   r   itemsdictlenr   r"   randomZchoiceappend
IndexErroraddEnvironmentErrorupdatevalues	close_rpcr   )who_hasr   closer   r   Zbad_addressesZmissing_workersZresultsZall_bad_keysdZrevZbad_keyskeyZ	addressesr   rpcsZ
coroutinesr$   crr   )r   r#   r    r   r   r   r   gather_from_workers   sN    
r7   c               @   s    e Zd ZdZdd Zdd ZdS )
WrappedKeya   Interface for a key in a dask graph.

    Subclasses must have .key attribute that refers to a key in a dask graph.

    Sometimes we want to associate metadata to keys in a dask graph.  For
    example we might know that that key lives on a particular machine or can
    only be accessed in a certain way.  Schedulers may have particular needs
    that can only be addressed by additional metadata.
    c             C   s
   || _ d S )N)r3   )selfr3   r   r   r   __init__^   s    zWrappedKey.__init__c             C   s   dt | j| jf S )Nz%s('%s'))type__name__r3   )r9   r   r   r   __repr__a   s    zWrappedKey.__repr__N)r<   
__module____qualname____doc__r:   r=   r   r   r   r   r8   T   s   	r8   c          	   #   s4  t | tstt |tstttdd |  D }tt|  \}}ttd t	| t
|}td  t	|7  < tt|||}td|}	dd |	 D }	fdd|	D z$t fdd|	 D V }
W d	x D ]}|  qW X td
d |
D }dd td| D }t|||fd	S )a   Scatter data directly to workers

    This distributes data in a round-robin fashion to a set of workers based on
    how many cores they have.  ncores should be a dictionary mapping worker
    identities to numbers of cores.

    See scatter for parameter docstring
    c             s   s   | ]\}}|g| V  qd S )Nr   )r   wZncr   r   r   	<genexpr>u   s    z%scatter_to_workers.<locals>.<genexpr>r   c             S   s    i | ]\}}d d |D |qS )c             S   s   i | ]\}}}||qS r   r   )r   _r3   valuer   r   r   r   }   s    z1scatter_to_workers.<locals>.<dictcomp>.<dictcomp>r   )r   r$   r   r   r   r   r   }   s   z&scatter_to_workers.<locals>.<dictcomp>c                s   i | ]} ||qS r   r   )r   r   )r   r   r   r      s    c                s$   g | ]\}}| j | d qS ))r   reportr   )Zupdate_data)r   r   r   )rE   r4   r   r   r   
<listcomp>   s   z&scatter_to_workers.<locals>.<listcomp>Nc             s   s   | ]}|d  V  qdS )nbytesNr   )r   or   r   r   rB      s    c             S   s    i | ]\}}d d |D |qS )c             S   s   g | ]\}}}|qS r   r   )r   rA   rC   r   r   r   rF      s    z1scatter_to_workers.<locals>.<dictcomp>.<listcomp>r   )r   r   r   r   r   r   r      s    r   )
isinstancer&   AssertionErrorr"   r   r%   zipr   _round_robin_counterr'   r   r   r   r.   r/   r
   r   )Zncoresr   r   rE   r   ZworkersnamesZworker_iterLr2   outr6   rG   r0   r   )rE   r   r4   r   r   scatter_to_workersh   s&    

rP   Fc                s  dkr"t  t|  }|fS t| }|tkr| s<| S t| d tkr| d }t   fdd|j D }t fdd| dd D }r  rtdd D ntd	d D |j }t||j	||j
f|  S | S nt fd
d| D S |tkr>| s"| S  fdd| D }	||	S |tkr~| rx fdd|  D }
tt|  |
S | S n2t|tr| j} rt|}|  |S | S dS )a   Unpack WrappedKey objects from collection

    Returns original collection and set of all found WrappedKey objects

    Examples
    --------
    >>> rd = WrappedKey('mykey')
    >>> unpack_remotedata(1)
    (1, set())
    >>> unpack_remotedata(())
    ((), set())
    >>> unpack_remotedata(rd)
    ('mykey', {WrappedKey('mykey')})
    >>> unpack_remotedata([1, rd])
    ([1, 'mykey'], {WrappedKey('mykey')})
    >>> unpack_remotedata({1: rd})
    ({1: 'mykey'}, {WrappedKey('mykey')})
    >>> unpack_remotedata({1: [rd]})
    ({1: ['mykey']}, {WrappedKey('mykey')})

    Use the ``byte_keys=True`` keyword to force string keys

    >>> rd = WrappedKey(('x', 1))
    >>> unpack_remotedata(rd, byte_keys=True)
    ("('x', 1)", {WrappedKey('('x', 1)')})
    Nr   c                s   i | ]\}}t | |qS r   )unpack_remotedata)r   r   r   )	byte_keysfuturesr   r   r      s   z%unpack_remotedata.<locals>.<dictcomp>c             3   s   | ]}t | V  qd S )N)rQ   )r   i)rR   rS   r   r   rB      s    z$unpack_remotedata.<locals>.<genexpr>r   c             s   s   | ]}t |jV  qd S )N)r   r3   )r   fr   r   r   rB      s    c             s   s   | ]}|j V  qd S )N)r3   )r   rU   r   r   r   rB      s    c             3   s   | ]}t | V  qd S )N)rQ   )r   item)rR   mysetr   r   rB      s    c                s   g | ]}t | qS r   )rQ   )r   rV   )rR   rW   r   r   rF      s    z%unpack_remotedata.<locals>.<listcomp>c                s   g | ]}t | qS r   )rQ   )r   r   )rR   rW   r   r   rF      s    )r   rQ   r;   tupler	   dskr%   r-   inkeysZoutkeynamecollection_typesr&   r.   rK   r   
issubclassr8   r3   r   r+   )rH   rR   rW   rO   typZscrY   argsrZ   Zoutsr.   r   r   )rR   rS   rW   r   rQ      sL    
 
(



rQ   c                s   t | }yt| r$|  kr$ |  S W n tk
r:   Y nX |tkr\| fdd| D S |tkr| fdd|  D S | S dS )a   Merge known data into tuple or dict

    Parameters
    ----------
    o:
        core data structures containing literals and keys
    d: dict
        mapping of keys to data

    Examples
    --------
    >>> data = {'x': 1}
    >>> pack_data(('x', 'y'), data)
    (1, 'y')
    >>> pack_data({'a': 'x', 'b': 'y'}, data)  # doctest: +SKIP
    {'a': 1, 'b': 'y'}
    >>> pack_data({'a': ['x'], 'b': 'y'}, data)  # doctest: +SKIP
    {'a': [1], 'b': 'y'}
    c                s   g | ]}t | d qS ))	key_types)	pack_data)r   x)r2   r`   r   r   rF      s    zpack_data.<locals>.<listcomp>c                s    i | ]\}}t | d |qS ))r`   )ra   )r   r   r   )r2   r`   r   r   r      s    zpack_data.<locals>.<dictcomp>N)r;   rI   	TypeErrorr\   r&   r%   )rH   r2   r`   r^   r   )r2   r`   r   ra      s    ra   )TNN)FN)&Z
__future__r   r   r   collectionsr   	itertoolsr   r(   Ztornador   Ztornado.genr   Zdask.optimizationr	   Ztoolzr
   r   r   r   Zcorer   Zutilsr   r   	coroutiner7   objectr8   rL   rP   rX   r"   r   	frozensetr\   rQ   ra   r   r   r   r   <module>   s$   B'
J