B
    T\                 @   s  d dl mZmZmZ d dlZd dlZd dlmZ ddlm	Z	m
Z
 ddlmZ ddl
mZmZmZmZmZmZmZ ddlmZmZ d	d
 Zdd Zd*ddZdd Zd+ddZd,ddZdd Zdd Zdd Zdd Z dd  Z!d-d!d"Z"d#d$ Z#e$d%Z%d&d' Z&G d(d) d)e'Z(dS ).    )absolute_importdivisionprint_functionN)getitem   )configcore)unicode)istaskget_dependenciessubstoposortflattenreverse_dict
ishashable)addincc                s   t |ttfs|g}g }t }t }ttt|}xp|rg }||7 } fdd|D }|| x:|D ]2\}}	x(|	D ] }
|
|krv||
 ||
 qvW qhW |}q6W  fdd|D }||fS )a   Return new dask with only the tasks required to calculate keys.

    In other words, remove unnecessary tasks from dask.
    ``keys`` may be a single key or list of keys.

    Examples
    --------
    >>> d = {'x': 1, 'y': (inc, 'x'), 'out': (add, 'x', 10)}
    >>> dsk, dependencies = cull(d, 'out')  # doctest: +SKIP
    >>> dsk  # doctest: +SKIP
    {'x': 1, 'out': (add, 'x', 10)}
    >>> dependencies  # doctest: +SKIP
    {'x': set(), 'out': set(['x'])}

    Returns
    -------
    dsk: culled dask graph
    dependencies: Dict mapping {key: [deps]}.  Useful side effect to accelerate
        other optimizations, notably fuse.
    c                s   g | ]}|t  |d dfqS )T)as_list)r   ).0k)dsk 0lib/python3.7/site-packages/dask/optimization.py
<listcomp>-   s   zcull.<locals>.<listcomp>c                s   i | ]} | |qS r   r   )r   r   )r   r   r   
<dictcomp>7   s    zcull.<locals>.<dictcomp>)
isinstancelistsetdictr   updater   append)r   keysZout_keysseendependenciesworknew_workdeps_Zdeplistdoutr   )r   r   cull   s(    



r*   c             C   s   t | d }|tks|tkrLdd | ddd D }|| d  d|S |tkrt| d dkrt| d d ttfrdd | ddd D }|| d d  d|f| d dd  S dS dS )	zCreate new keys for fused tasksr   c             S   s   g | ]}t |qS r   )	key_split)r   xr   r   r   r   @   s    z5default_fused_linear_keys_renamer.<locals>.<listcomp>N-c             S   s   g | ]}t |qS r   )r+   )r   r,   r   r   r   r   E   s    r   )typestrr	   r    jointuplelenr   )r!   typnamesr   r   r   !default_fused_linear_keys_renamer<   s    
r6   Tc                sn  |dk	r.t |ts.t |ts"|g}tt|}|dkrH fdd D }i }t }x D ]|}|| }t|dk}xb|D ]Z}	|dk	r|	|kr||	 qv|	|kr||	= ||	 qv|r||	 qv|	|krv|||	< qvW qXW g }
ttt|	 }x|rv|
 \}	}|	|g}x*||kr4||}||= || qW |  x*|	|krh||	}	||	= ||	 q@W |
| qW dd |	 D }|dkrt}n|dkrd}n|}i }t }t }d}x|
D ]}|dk	r||}|dk	o| ko||k}| }	 |	 }xT|r^| }|| ||	 || |	 t | |	|}||	 |}	qW ||	 |r|||< |||	< ||	 ||< |h||	< ||	 n|||	< qW x( 	 D ]\}}||kr|||< qW |rfxZ|	 D ]N\}}xB||@ D ]6}|| }|| || t|| ||||< qW qW |dk	rfx|| D ]}||= ||= qPW ||fS )a3   Return new dask graph with linear sequence of tasks fused together.

    If specified, the keys in ``keys`` keyword argument are *not* fused.
    Supply ``dependencies`` from output of ``cull`` if available to avoid
    recomputing dependencies.

    **This function is mostly superseded by ``fuse``**

    Parameters
    ----------
    dsk: dict
    keys: list
    dependencies: dict, optional
        {key: [list-of-keys]}.  Must be a list to provide count of each key
        This optional input often comes from ``cull``
    rename_keys: bool or func, optional
        Whether to rename fused keys with ``default_fused_linear_keys_renamer``
        or not.  Renaming fused keys can keep the graph more understandable
        and comprehensive, but it comes at the cost of additional processing.
        If False, then the top-most key will be used.  For advanced usage, a
        func is also accepted, ``new_key = rename_keys(fused_key_list)``.

    Examples
    --------
    >>> d = {'a': 1, 'b': (inc, 'a'), 'c': (inc, 'b')}
    >>> dsk, dependencies = fuse(d)
    >>> dsk # doctest: +SKIP
    {'a-b-c': (inc, (inc, 1)), 'c': 'a-b-c'}
    >>> dsk, dependencies = fuse(d, rename_keys=False)
    >>> dsk # doctest: +SKIP
    {'c': (inc, (inc, 1))}
    >>> dsk, dependencies = fuse(d, keys=['b'], rename_keys=False)
    >>> dsk  # doctest: +SKIP
    {'b': (inc, 1), 'c': (inc, 'b')}

    Returns
    -------
    dsk: output graph with keys fused
    dependencies: dict mapping dependencies after fusion.  Useful side effect
        to accelerate other downstream optimizations.
    Nc                s   i | ]}t  |d d|qS )T)r   )r   )r   r   )r   r   r   r   |   s   zfuse_linear.<locals>.<dictcomp>r   c             S   s   i | ]\}}t ||qS r   )r   )r   r   vr   r   r   r      s    TF)r   r   r   r   r3   r   r   mapreverseditemspopitempopr    reverser6   r   remover   )r   r!   r#   rename_keyschild2parent	unfusibleparentr&   has_many_childrenchildchainsparent2childchainkey_renamerrvZfusedaliasesZ
is_renamedZnew_keyvalkeyZold_keyr   )r   r   fuse_linearL   s    *
















 
rM   c             C   s8   | d krt  S t| t r| S t| tt fs0| g} t | S )N)r   r   r   )r,   r   r   r   	_flat_set   s    
rN   c                sF   r,t tt  tr,dd   D  t|} dkrNfddD  |rp| fdd D  tt	fdd|D  d}i }xX|D ]P}| }x:| | @ D ]*}||kr|| }	n| }	t
|||	}qW |||< qW | }
xL D ]@\}}||
krx&| | @ D ]}t
|||| }qW ||
|< qW |
S )	a   Return new dask with the given keys inlined with their values.

    Inlines all constants if ``inline_constants`` keyword is True. Note that
    the constant keys will remain in the graph, to remove them follow
    ``inline`` with ``cull``.

    Examples
    --------
    >>> d = {'x': 1, 'y': (inc, 'x'), 'z': (add, 'x', 'y')}
    >>> inline(d)  # doctest: +SKIP
    {'x': 1, 'y': (inc, 1), 'z': (add, 1, 'y')}

    >>> inline(d, keys='y')  # doctest: +SKIP
    {'x': 1, 'y': (inc, 1), 'z': (add, 1, (inc, 1))}

    >>> inline(d, keys='y', inline_constants=False)  # doctest: +SKIP
    {'x': 1, 'y': (inc, 1), 'z': (add, 'x', (inc, 'x'))}
    c             S   s   i | ]\}}t ||qS r   )r   )r   r   r7   r   r   r   r      s    zinline.<locals>.<dictcomp>Nc                s   i | ]}t  ||qS r   )r   )r   r   )r   r   r   r      s   c             3   s6   | ].\}}t |r|ks* | st|s|V  qd S )N)r   r
   )r   r   r7   )r#   r   r   r   	<genexpr>   s    zinline.<locals>.<genexpr>c             3   s"   | ]}| kr| | fV  qd S )Nr   )r   r   )r   r   r   rO     s    )r#   )r   nextitervaluesr   r:   rN   r   r   r   r   copy)r   r!   inline_constantsr#   ZreplaceorderZkeysubsrL   rK   Zdepreplacedsk2itemr   )r#   r   r   inline   s4    


rX   Fc                s   sS t t |dkr2fddD }t| fdd fdd D }|rt|||dx|D ]
}|= qzW S )	a   Inline cheap functions into larger operations

    Examples
    --------
    >>> dsk = {'out': (add, 'i', 'd'),  # doctest: +SKIP
    ...        'i': (inc, 'x'),
    ...        'd': (double, 'y'),
    ...        'x': 1, 'y': 1}
    >>> inline_functions(dsk, [], [inc])  # doctest: +SKIP
    {'out': (add, (inc, 'x'), 'd'),
     'd': (double, 'y'),
     'x': 1, 'y': 1}

    Protect output keys.  In the example below ``i`` is not inlined because it
    is marked as an output key.

    >>> inline_functions(dsk, ['i', 'out'], [inc, double])  # doctest: +SKIP
    {'out': (add, 'i', (double, 'y')),
     'i': (inc, 'x'),
     'x': 1, 'y': 1}
    Nc                s   i | ]}t  ||qS r   )r   )r   r   )r   r   r   r   :  s   z$inline_functions.<locals>.<dictcomp>c                s(   yt |  S  tk
r"   dS X d S )NF)functions_ofissubset	TypeError)r7   )fast_functionsr   r   	inlinable>  s    z#inline_functions.<locals>.inlinablec                s4   g | ],\}}t |r | r|kr|r|qS r   )r
   )r   r   r7   )
dependentsr]   outputr   r   r   D  s    z$inline_functions.<locals>.<listcomp>)rT   r#   )r   r   r:   rX   )r   r_   r\   rT   r#   r!   r   r   )r^   r   r\   r]   r_   r   inline_functions  s     


r`   c             C   s   xt | dr| j} qW | S )Nfunc)hasattrra   )ra   r   r   r   unwrap_partialO  s    
rc   c             C   sx   t  }| g}tth}x^|rrg }xL|D ]D} t| |kr$t| r`|t| d  || dd 7 }q$|| 7 }q$W |}qW |S )z Set of functions contained within nested task

    Examples
    --------
    >>> task = (add, (mul, 1, 2), (inc, 3))  # doctest: +SKIP
    >>> functions_of(task)  # doctest: +SKIP
    set([add, mul, inc])
    r   r   N)r   r   r2   r/   r
   r   rc   )ZtaskZfuncsr$   Zsequence_typesr%   r   r   r   rY   U  s    	
rY   c          	   C   s   t  }x|  D ]\}}yht|rv|d |krv|d | krvt| |d  rv| |d  d |krv||| |d  ||< n|||< W q tk
r   |||< Y qX qW |S )a+  Fuse selections with lower operation.

    Handles graphs of the form:
    ``{key1: (head1, key2, ...), key2: (head2, ...)}``

    Parameters
    ----------
    dsk : dict
        dask graph
    head1 : function
        The first element of task1
    head2 : function
        The first element of task2
    merge : function
        Takes ``task1`` and ``task2`` and returns a merged task to
        replace ``task1``.

    Examples
    --------
    >>> def load(store, partition, columns):
    ...     pass
    >>> dsk = {'x': (load, 'store', 'part', ['a', 'b']),
    ...        'y': (getitem, 'x', 'a')}
    >>> merge = lambda t1, t2: (load, t2[1], t2[2], t1[2])
    >>> dsk2 = fuse_selections(dsk, getitem, load, merge)
    >>> cull(dsk2, 'y')[0]
    {'y': (<function load at ...>, 'store', 'part', 'a')}
    r   r   )r   r:   r
   r[   )r   Zhead1Zhead2mergerV   r   r7   r   r   r   fuse_selectionsq  s     $re   c                s   t | t| fddS )a6   Fuse getitem with lower operation

    Parameters
    ----------
    dsk: dict
        dask graph
    func: function
        A function in a task to merge
    place: int
        Location in task to insert the getitem key

    Examples
    --------
    >>> def load(store, partition, columns):
    ...     pass
    >>> dsk = {'x': (load, 'store', 'part', ['a', 'b']),
    ...        'y': (getitem, 'x', 'a')}
    >>> dsk2 = fuse_getitem(dsk, load, 3)  # columns in arg place 3
    >>> cull(dsk2, 'y')[0]
    {'y': (<function load at ...>, 'store', 'part', 'a')}
    c                s.   t |d   | d f t | d d   S )N   r   )r2   )ab)placer   r   <lambda>  s    zfuse_getitem.<locals>.<lambda>)re   r   )r   ra   ri   r   )ri   r   fuse_getitem  s    rk   c             C   s   t | }t|}t|}|tks(|tkrdt|}dd |D }|| t|}|| d	|S |t
krt|dkrt|d ttfrt|}dd |D }|| t|}||d  d	|f|dd  S dS )z"Create new keys for ``fuse`` tasksc             S   s   h | ]}t |qS r   )r+   )r   r   r   r   r   	<setcomp>  s    z-default_fused_keys_renamer.<locals>.<setcomp>r.   r   c             S   s   h | ]}t |qS r   )r+   )r   r   r   r   r   rl     s    r   N)r9   rP   r/   r0   r	   r+   discardsortedr    r1   r2   r3   r   )r!   itZ	first_keyr4   Z
first_namer5   r   r   r   default_fused_keys_renamer  s$    



rp   c	       @   
      s6  |dk	r.t |ts.t |ts"|g}tt|}|dkrXtdddkrLd}ntdd}|dkrtdddkrzt }ntdd}|ptddp|d }|ptddpd|t|d   }|dkrtdd	}|r|s |fS |dkrtd
d}|dkrt	}	n|d	krd}	n|}	|	dk	}|dkrF fdd D }
nt
|}
i }xV|
 D ]J\}}x2|D ]*}||kr|g||< n|| | qjW t||
|< q\W dd | D }|r||8 }|s|rtdd | D r |
fS   }i }g }g }|
j}|j}|j}|j}|j}|j}|j}|j}|j}|j}xj|r| }|| x||krz|| d }q`W || |||
| @  x|d }||kr||
| @ } x,| r||  |}|d }||
| @ } qW |  |||| |r |gnddddd|
| | f q|  |
| }!|!| }"|!|" } t| }#|#dkr| \}$}%}&}'}(})}*}+t|+},|*|,d   krdkrn n|,d }*|"|+O }"t|"|,k}-|-s|*d7 }*|)|* |' |kr|-s|'|k rt | |$|%}.|!|$ |!||$O }!||$= ||$ |r.|&| |&||< ||$d |rv|-rT|||.|&|'|(|)|*|"f n |||.|&|'d |(|)d |*|"f n
|.||< P n`|%||$< ||$ |r|*t|d krt|d }*|||| |r|gndd|(d|*|"f nP ng }&d}'d}(d}/d})d}*t }+d}0||# d }1||# d= xv|1D ]n\}2}3}4}5}6}7}8}9|5dkrP|/d7 }/n|5|'kr^|5}'|(|67 }(|)|77 })|*|87 }*t|9|0krt|9}0|+|9O }+q(W t|+},|*t|#d td|,|0 7 }*|*|,d   krdkrn n|,d }*|"|+O }"t|"|,k}-|-s|*d7 }*|)|* |' |kr|/|kr|(|kr|'|kr|-sH|'|k r | }.t }:x^|1D ]V};|;d }<t|.|<|;d }.||<= |:||<O }:||< |r\||<d |&|;d  q\W |!| 8 }!|!|:O }!|r|&| |&||< |r|||.|&|'d |(|)d |*|"f n
|.||< P nx*|1D ]"};|;d ||;d < ||;d  qW |r|(|krR|}(|*t|d krpt|d }*|||| |r|gndd|(d|*|"f nP || d }qW qJW |rt|||
|| |r.x\| D ]P\}=}>|	|>}?|?dk	r|?|kr||= ||?< |?||=< |
|= |
|?< |?h|
|=< qW ||
fS )a[   Fuse tasks that form reductions; more advanced than ``fuse_linear``

    This trades parallelism opportunities for faster scheduling by making tasks
    less granular.  It can replace ``fuse_linear`` in optimization passes.

    This optimization applies to all reductions--tasks that have at most one
    dependent--so it may be viewed as fusing "multiple input, single output"
    groups of tasks into a single task.  There are many parameters to fine
    tune the behavior, which are described below.  ``ave_width`` is the
    natural parameter with which to compare parallelism to granularity, so
    it should always be specified.  Reasonable values for other parameters
    with be determined using ``ave_width`` if necessary.

    Parameters
    ----------
    dsk: dict
        dask graph
    keys: list or set, optional
        Keys that must remain in the returned dask graph
    dependencies: dict, optional
        {key: [list-of-keys]}.  Must be a list to provide count of each key
        This optional input often comes from ``cull``
    ave_width: float (default 2)
        Upper limit for ``width = num_nodes / height``, a good measure of
        parallelizability
    max_width: int
        Don't fuse if total width is greater than this
    max_height: int
        Don't fuse more than this many levels
    max_depth_new_edges: int
        Don't fuse if new dependencies are added after this many levels
    rename_keys: bool or func, optional
        Whether to rename the fused keys with ``default_fused_keys_renamer``
        or not.  Renaming fused keys can keep the graph more understandable
        and comprehensive, but it comes at the cost of additional processing.
        If False, then the top-most key will be used.  For advanced usage, a
        function to create the new name is also accepted.
    fuse_subgraphs : bool, optional
        Whether to fuse multiple tasks into ``SubgraphCallable`` objects.

    Returns
    -------
    dsk: output graph with keys fused
    dependencies: dict mapping dependencies after fusion.  Useful side effect
        to accelerate other downstream optimizations.
    NZfuse_ave_widthr   Zfuse_max_heightZfuse_max_depth_new_edgesg      ?Zfuse_max_widthfuse_subgraphsFZfuse_rename_keysTc                s   i | ]}t  |d d|qS )T)r   )r   )r   r   )r   r   r   r   *  s    zfuse.<locals>.<dictcomp>c             S   s    h | ]\}}t |d kr|qS )r   )r3   )r   r   valsr   r   r   rl   7  s    zfuse.<locals>.<setcomp>c             s   s   | ]}t t|d kV  qdS )r   N)r3   r   )r   r7   r   r   r   rO   ;  s    zfuse.<locals>.<genexpr>r   r-   rf   )r   r   r   r   r   getr3   mathlogrp   r   r:   r    allrR   rS   r<   r   r>   extendr   intminmax_inplace_fuse_subgraphs)@r   r!   r#   Z	ave_widthZ	max_widthZ
max_heightZmax_depth_new_edgesr?   rq   rH   r&   Zrdepsr   rr   r7   Z	reduciblerI   fused_treesZ
info_stackZchildren_stackZdeps_popZreducible_addZreducible_popZreducible_removeZfused_trees_popZinfo_stack_appendZinfo_stack_popZchildren_stack_appendZchildren_stack_extendZchildren_stack_poprB   rD   ZchildrenZdeps_parentZedgesZnum_childrenZ	child_keyZ
child_taskZ
child_keysZheightwidthZ	num_nodesZfudgeZchildren_edgesZnum_children_edgesZno_new_edgesrK   Znum_single_nodesZmax_num_edgesZchildren_infoZcur_keyZcur_taskZcur_keysZ
cur_heightZ	cur_widthZcur_num_nodesZ	cur_fudgeZ	cur_edgesZchildren_depsZ
child_infoZ	cur_childZroot_keyZ
fused_keysaliasr   )r   r   fuse  s   1









 






 





"

r   c                s(  i }t  }x D ]|}|| }t|dk}	xb|D ]Z}
|dk	rN|
|krN||
 q.|
|krh||
= ||
 q.|	rx||
 q.|
|kr.|||
< q.W qW g }dd | D }x|rX| \}
}|
|g}x&||kr||}||= || qW |  x(|
|kr||
}
||
= ||
 qW d}x4|D ],}|t | 7 }|dkr&|| P q&W qW x|D ]} fdd|D }|d }||d   }||< x"|dd D ]}||=  |= qW t	|}t
|||f|  |< |r`g }x6|D ].}||d}|r|| n
|| qW |||< q`W dS )	zJSubroutine of fuse.

    Mutates dsk, depenencies, and fused_trees inplacer   Nc             S   s   i | ]\}}||qS r   r   )r   r   r7   r   r   r   r     s    z+_inplace_fuse_subgraphs.<locals>.<dictcomp>r   c                s   i | ]} | |qS r   r   )r   r   )r   r   r   r   3  s    r-   F)r   r3   r   r:   r;   r<   r    r=   r
   r2   SubgraphCallablerw   )r   r!   r#   r|   r?   r@   rA   rB   r&   rC   rD   rE   rF   rG   ZntasksrL   ZsubgraphoutkeyZ
inkeys_setr   inkeysZchain2Zsubchainr   )r   r   r{     sh    









r{   z[a-f]+c             C   s
  t | tkr|  } t | tkr(| d } y| d}|d d  sT|d d}n|d }xF|dd D ]6}| rt|dkrt	|dk	s|d| 7 }qjP qjW t|dkrt
	d|rd	S |d d
kr|d d dd }|S W n tk
r   dS X dS )aQ  
    >>> key_split('x')
    'x'
    >>> key_split('x-1')
    'x'
    >>> key_split('x-1-2-3')
    'x'
    >>> key_split(('x-2', 1))
    'x'
    >>> key_split("('x-2', 1)")
    'x'
    >>> key_split('hello-world-1')
    'hello-world'
    >>> key_split(b'hello-world-1')
    'hello-world'
    >>> key_split('ae05086432ca935f6eba409a8ecd4896')
    'data'
    >>> key_split('<module.submodule.myclass object at 0xdaf372')
    'myclass'
    >>> key_split(None)
    'Other'
    >>> key_split('x-abcdefab')  # ignores hex
    'x'
    >>> key_split('_(x)')  # strips unpleasant characters
    'x'
    r   r.   z_'()"r   N       z[a-f0-9]{32}data<z<>.r-   ZOther)r/   bytesdecoder2   splitisalphastripr3   hex_patternmatchre	Exception)sZwordsresultZwordr   r   r   r+   Q  s*    
r+   c               @   sF   e Zd ZdZdZdddZdd Zdd	 Zd
d Zdd Z	dd Z
dS )r   aD  Create a callable object from a dask graph.

    Parameters
    ----------
    dsk : dict
        A dask graph
    outkey : hashable
        The output key from the graph
    inkeys : list
        A list of keys to be used as arguments to the callable.
    name : str, optional
        The name to use for the function.
    )r   r   r   namesubgraph_callablec             C   s   || _ || _|| _|| _d S )N)r   r   r   r   )selfr   r   r   r   r   r   r   __init__  s    zSubgraphCallable.__init__c             C   s   | j S )N)r   )r   r   r   r   __repr__  s    zSubgraphCallable.__repr__c             C   sH   t | t |koF| j|jkoF| j|jkoFt| jt|jkoF| j|jkS )N)r/   r   r   r   r   r   )r   otherr   r   r   __eq__  s
    zSubgraphCallable.__eq__c             C   s
   | |k S )Nr   )r   r   r   r   r   __ne__  s    zSubgraphCallable.__ne__c             G   sJ   t |t | jks,tdt | jt |f t| j| jtt| j|S )NzExpected %d args, got %d)	r3   r   
ValueErrorr   rs   r   r   r   zip)r   argsr   r   r   __call__  s
    zSubgraphCallable.__call__c             C   s   t | j| j| j| jffS )N)r   r   r   r   r   )r   r   r   r   
__reduce__  s    zSubgraphCallable.__reduce__N)r   )__name__
__module____qualname____doc__	__slots__r   r   r   r   r   r   r   r   r   r   r     s   
r   )NNT)NTN)NFN)NNNNNNNN))Z
__future__r   r   r   rt   r   operatorr    r   r   Zcompatibilityr	   r
   r   r   r   r   r   r   Z
utils_testr   r   r*   r6   rM   rN   rX   r`   rc   rY   re   rk   rp   r   r{   compiler   r+   objectr   r   r   r   r   <module>   s:   $.
 

: 
3*  
  :I
5