B
    T\U                 @   sx  d Z ddlmZmZmZ ddlZddlZddlmZm	Z	m
Z
 ddlmZmZmZmZ ddlZddlZddlmZmZ ddlmZ dd	lmZ dd
lmZ ddlmZmZmZ ddlmZ ddl m!Z! ddl"m#Z# dd Z$dd Z%dd Z&dd Z'dd Z(d6ddZ)dd Z*dd Z+d d! Z,d"d# Z-d$d% Z.d&d' Z/d(d) Z0d7d*d+Z1d,d- Z2G d.d/ d/e3Z4d0d1 Z5d2d3 Z6d4d5 Z7dS )8z
The rechunk module defines:
    intersect_chunks: a function for
        converting chunks to new dimensions
    rechunk: a function to convert the blocks
        of an existing dask array to new chunks or blockshape
    )absolute_importdivisionprint_functionN)productchaincount)getitemaddmul
itemgetter)
accumulatereduce   )tokenize)HighLevelGraph)parse_bytes   )concatenate3Arraynormalize_chunks)validate_axis)empty)configc                s    fdd| D S )z Internal utility for cumulative sum with label.

    >>> cumdims_label(((5, 3, 3), (2, 2, 1)), 'n')  # doctest: +NORMALIZE_WHITESPACE
    [(('n', 0), ('n', 5), ('n', 8), ('n', 11)),
     (('n', 0), ('n', 2), ('n', 4), ('n', 5))]
    c          	      s2   g | ]*}t t fd t|  ttd| qS )r   )r   )tupleziplenr   r	   ).0Zbds)const 1lib/python3.7/site-packages/dask/array/rechunk.py
<listcomp>$   s   z!cumdims_label.<locals>.<listcomp>r   )chunksr   r   )r   r   cumdims_label   s    
r"   c             C   s   t t| | tddS )aG  

    >>> new = cumdims_label(((2, 3), (2, 2, 1)), 'n')
    >>> old = cumdims_label(((2, 2, 1), (5,)), 'o')

    >>> _breakpoints(new[0], old[0])
    (('n', 0), ('o', 0), ('n', 2), ('o', 2), ('o', 4), ('n', 5), ('o', 5))
    >>> _breakpoints(new[1], old[1])
    (('n', 0), ('o', 0), ('n', 2), ('n', 4), ('n', 5), ('o', 5))
    r   )key)r   sortedr   )ZcumoldZcumnewr   r   r   _breakpoints)   s    r%   c             C   s   d}d}d}g }g }xt dt| D ]}| | \}}| |d  \}	}
|	dkr^|r^|| g }|	dkrld}n|}||
 | }|}||
krq$||t||f |dkr$|d7 }d}q$W |r|| |S )aB  
    Internal utility to intersect chunks for 1d after preprocessing.

    >>> new = cumdims_label(((2, 3), (2, 2, 1)), 'n')
    >>> old = cumdims_label(((2, 2, 1), (5,)), 'o')

    >>> _intersect_1d(_breakpoints(old[0], new[0]))  # doctest: +NORMALIZE_WHITESPACE
    [[(0, slice(0, 2, None))],
     [(1, slice(0, 2, None)), (2, slice(0, 1, None))]]
    >>> _intersect_1d(_breakpoints(old[1], new[1]))  # doctest: +NORMALIZE_WHITESPACE
    [[(0, slice(0, 2, None))],
     [(0, slice(2, 4, None))],
     [(0, slice(4, 5, None))]]

    Parameters
    ----------

    breaks: list of tuples
        Each tuple is ('o', 8) or ('n', 8)
        These are pairs of 'o' old or new 'n'
        indicator with a corresponding cumulative sum.

    Uses 'o' and 'n' to make new tuples of slices for
    the new block crosswalk to old blocks.
    r   r   no)ranger   appendslice)ZbreaksstartZlast_endZold_idxZretZret_nextidxZlabelbrZ
last_labelZlast_brendr   r   r   _intersect_1d7   s4    

r/   c             C   s   dd | D }dd |D }dd | D }dd |D }t |d}t |d}dd |D }d	d |D }	||	kstd
||	f ||kstddd t||D }
x6t|D ]*\}}|rdd t|D }|
|| qW |
S )a   Helper to build old_chunks to new_chunks.

    Handles missing values, as long as the missing dimension
    is unchanged.

    Examples
    --------
    >>> old = ((10, 10, 10, 10, 10), )
    >>> new = ((25, 5, 20), )
    >>> _old_to_new(old, new)  # doctest: +NORMALIZE_WHITESPACE
    [[[(0, slice(0, 10, None)), (1, slice(0, 10, None)), (2, slice(0, 5, None))],
      [(2, slice(5, 10, None))],
      [(3, slice(0, 10, None)), (4, slice(0, 10, None))]]]
    c             S   s"   g | ]}t d d |D s|qS )c             s   s   | ]}t |V  qd S )N)mathisnan)r   yr   r   r   	<genexpr>   s    z)_old_to_new.<locals>.<listcomp>.<genexpr>)any)r   xr   r   r   r       s    z_old_to_new.<locals>.<listcomp>c             S   s"   g | ]}t d d |D s|qS )c             s   s   | ]}t |V  qd S )N)r0   r1   )r   r2   r   r   r   r3      s    z)_old_to_new.<locals>.<listcomp>.<genexpr>)r4   )r   r5   r   r   r   r       s    c             S   s   g | ]}t d d |D qS )c             s   s   | ]}t |V  qd S )N)r0   r1   )r   r2   r   r   r   r3      s    z)_old_to_new.<locals>.<listcomp>.<genexpr>)sum)r   r5   r   r   r   r       s    c             S   s   g | ]}t d d |D qS )c             s   s   | ]}t |V  qd S )N)r0   r1   )r   r2   r   r   r   r3      s    z)_old_to_new.<locals>.<listcomp>.<genexpr>)r6   )r   r5   r   r   r   r       s    r'   r&   c             S   s   g | ]}t |qS r   )r6   )r   r'   r   r   r   r       s    c             S   s   g | ]}t |qS r   )r6   )r   r&   r   r   r   r       s    z&Cannot change dimensions from %r to %rz2Chunks must be unchanging along unknown dimensionsc             S   s"   g | ]}t t|d  |d qS )r   r   )r/   r%   )r   cmr   r   r   r       s    c             S   s   g | ]}|t d dfgqS )r   N)r*   )r   ir   r   r   r       s    )r"   
ValueErrorr   	enumerater(   insert)
old_chunks
new_chunksZ	old_knownZ	new_knownZ	n_missingZ
n_missing2ZcmoZcmnZsumsZsums2
old_to_newr,   ZmissingZextrar   r   r   _old_to_newp   s$    

r?   c             C   s(   t | |}t| }tdd |D }|S )a  
    Make dask.array slices as intersection of old and new chunks.

    >>> intersections = intersect_chunks(((4, 4), (2,)),
    ...                                  ((8,), (1, 1)))
    >>> list(intersections)  # doctest: +NORMALIZE_WHITESPACE
    [(((0, slice(0, 4, None)), (0, slice(0, 1, None))),
      ((1, slice(0, 4, None)), (0, slice(0, 1, None)))),
     (((0, slice(0, 4, None)), (0, slice(1, 2, None))),
      ((1, slice(0, 4, None)), (0, slice(1, 2, None))))]

    Parameters
    ----------

    old_chunks : iterable of tuples
        block sizes along each dimension (convert from old_chunks)
    new_chunks: iterable of tuples
        block sizes along each dimension (converts to new_chunks)
    c             s   s   | ]}t t| V  qd S )N)r   r   )r   crr   r   r   r3      s    z#intersect_chunks.<locals>.<genexpr>)r?   r   r   )r<   r=   r>   cross1Zcrossr   r   r   intersect_chunks   s    
rB   c                s2  t |trJ fdd| D }x(t jD ]}||kr, j| ||< q,W t |ttfrrtdd t| jD }t	| j
| j jd}| jkr S  j}t||kstdttt|}x<t| j
D ],\}}||krt|st|stdqW t j| jj||}	x|	D ]}
t |
 qW  S )aV  
    Convert blocks in dask array x for new chunks.

    Parameters
    ----------
    x: dask array
        Array to be rechunked.
    chunks:  int, tuple or dict
        The new block dimensions to create. -1 indicates the full size of the
        corresponding dimension.
    threshold: int
        The graph growth factor under which we don't bother introducing an
        intermediate step.
    block_size_limit: int
        The maximum block size (in bytes) we want to produce
        Defaults to the configuration value ``array.chunk-size``

    Examples
    --------
    >>> import dask.array as da
    >>> x = da.ones((1000, 1000), chunks=(100, 100))

    Specify uniform chunk sizes with a tuple

    >>> y = x.rechunk((1000, 10))

    Or chunk only specific dimensions with a dictionary

    >>> y = x.rechunk({0: 1000})

    Use the value ``-1`` to specify that you want a single chunk along a
    dimension or the value ``"auto"`` to specify that dask can freely rechunk a
    dimension to attain blocks of a uniform block size

    >>> y = x.rechunk({0: -1, 1: 'auto'}, block_size_limit=1e8)
    c                s   i | ]\}}|t | jqS r   )r   ndim)r   cv)r5   r   r   
<dictcomp>   s    zrechunk.<locals>.<dictcomp>c             s   s"   | ]\}}|d k	r|n|V  qd S )Nr   )r   ZlcZrcr   r   r   r3      s   zrechunk.<locals>.<genexpr>)limitdtypeZprevious_chunksz-Provided chunks are not consistent with shape)
isinstancedictitemsr(   rC   r!   r   listr   r   shaperH   r   r9   mapr6   r0   r1   plan_rechunkitemsize_compute_rechunk)r5   r!   	thresholdblock_size_limitr8   rC   Z
new_shapesnewoldstepsrD   r   )r5   r   rechunk   s0    %



rW   c             C   s   t ttt| S )N)r   r
   rN   r   )r!   r   r   r   _number_of_blocks   s    rX   c             C   s   t ttt| S )N)r   r
   rN   max)r!   r   r   r   _largest_block_size   s    rZ   c             C   s   t tdd t| |D }|S )z; Estimate the graph size during a rechunk computation.
    c             s   s"   | ]\}}t |t | V  qd S )N)r   )r   ocncr   r   r   r3     s   z&estimate_graph_size.<locals>.<genexpr>)r   r
   r   )r<   r=   Zcrossed_sizer   r   r   estimate_graph_size   s    
r]   c             C   sh   g }xZ| D ]R}t t|| }x.t|D ]"}|||  }|| ||8 }q*W |dks
tq
W t|S )zq Minimally divide the given chunks so as to make the largest chunk
    width less or equal than *max_width*.
    r   )intnpZceilr(   r)   AssertionErrorr   )desired_chunksZ	max_widthr!   rD   Z
nb_dividesr8   r&   r   r   r   divide_to_width	  s    

rb   c                s  t  |kr S t }t |dkr|| }t  }|| }|| }|||  }|||  | }|| f| |f||   S t | }t  | }	 fddtt  d D }
t|
 t }x|	dkrt|
\}}}|| dkr4|d7 }x|| dkr|d7 }qW t	|
|| ||  ||f qn6|| ||  |krjt	|
|| ||  ||f q|| dks|t
d||< |||< |	d8 }	qW ttd|S )z Minimally merge the given chunks so as to drop the number of
    chunks below *max_number*, while minimizing the largest width.
    r   c                s*   g | ]"} |  |d    ||d  fqS )r   r   )r   r8   )ra   r   r   r    /  s   z#merge_to_number.<locals>.<listcomp>r   N)r   setpopr6   r(   heapqheapifyrL   heappopheappushr`   r   filter)ra   
max_numberZdistinctwr&   ZtotalZdesired_widthwidthZadjustZnmergesheapr!   r8   jr   )ra   r   merge_to_number  sB    

ro   c                s^  t | }dd | D dd |D dd tt| |D fddt|D  fddt|D } fdd	}t||d
}tt}t| }d}	x|D ]}
||
  |
 pd }||kr||
 ||
< |}q|
 }t|| | }t	||
 |}t |t | |
 kr*|||
< |t
| | }d}	qW |t|ksDt||ksRtt||	fS )z
    Find an intermediate rechunk that would merge some adjacent blocks
    together in order to get us nearer the *new_chunks* target, without
    violating the *block_size_limit* (in number of elements).
    c             S   s   g | ]}t |qS r   )rY   )r   rD   r   r   r   r    T  s    z&find_merge_rechunk.<locals>.<listcomp>c             S   s   g | ]}t |qS r   )rY   )r   rD   r   r   r   r    U  s    c             S   s&   i | ]\}\}}t |t | |qS r   )r   )r   dimr[   r\   r   r   r   rF   W  s   z&find_merge_rechunk.<locals>.<dictcomp>c                s"   i | ]} | | pd  |qS )r   r   )r   rp   )new_largest_widthold_largest_widthr   r   rF   \  s   c                s   g | ]} | d kr|qS )g      ?r   )r   rp   )graph_size_effectr   r   r    d  s    c                s<   |  } |  }|dkrd}|dkr8t |t | S dS )Nr   g0D   ?r   )r_   log)kZgseZbse)block_size_effectrs   r   r   r#   m  s
    zfind_merge_rechunk.<locals>.key)r#   Fr   T)r   r:   r   r(   r$   r   r
   rL   r^   rb   rY   rZ   r`   r   )r<   r=   rS   rC   Zmerge_candidatesr#   Zsorted_candidatesZlargest_block_sizer!   memory_limit_hitrp   Znew_largest_block_sizeZlargest_widthZchunk_limitrD   r   )rv   rs   rq   rr   r   find_merge_rechunkL  s8    	

rx   c       	      C   s   t | }t| }xt|D ]}t||}||kr2P t | | t || krLqtt | | | | }t|| |}t ||kstt |t | | krt|t| | kr|||< qW t|S )z
    Find an intermediate rechunk that would split some chunks to
    get us nearer *new_chunks*, without violating the *graph_size_limit*.
    )	r   rL   r(   r]   r^   ro   r`   rY   r   )	r<   r=   Zgraph_size_limitrC   r!   rp   
graph_sizerj   rD   r   r   r   find_split_rechunk  s    
(rz   c             C   s.  |pt d}|pt d}t|tr.t|}t|}g }dd | D }|dks`t|r`t|rj||g S || }t| }t|}	t	|||	g}|t
| t
|  }
| }d}xvt||}||
k rP |r|}nt|||| }t|||\}}||kr|r||krP || |}|sP d}qW ||g S )a4   Plan an iterative rechunking from *old_chunks* to *new_chunks*.
    The plan aims to minimize the rechunk graph size.

    Parameters
    ----------
    itemsize: int
        The item size of the array
    threshold: int
        The graph growth factor under which we don't bother
        introducing an intermediate step
    block_size_limit: int
        The maximum block size (in bytes) we want to produce during an
        intermediate step

    Notes
    -----
    No intermediate steps will be planned if any dimension of ``old_chunks``
    is unknown.
    zarray.rechunk-thresholdzarray.chunk-sizec             S   s   g | ]}t d d |D qS )c             s   s   | ]}t |V  qd S )N)r0   r1   )r   r2   r   r   r   r3     s    z*plan_rechunk.<locals>.<listcomp>.<genexpr>)r4   )r   r5   r   r   r   r      s    z plan_rechunk.<locals>.<listcomp>r   TF)r   getrI   strr   r   allr4   rZ   rY   rX   r]   rz   rx   r)   )r<   r=   rP   rR   rS   rC   rV   Zhas_nansZlargest_old_blockZlargest_new_blockZgraph_size_thresholdZcurrent_chunksZ
first_passry   r!   rw   r   r   r   rO     sJ    





rO   c                s  j dkrtj|jdS j}tj|}t }t }t|}d| }d| }t	 }	t
jdd jD dd}
x$t
|
jD ]}jf| |
|< qW td	d
 |D  }x6t||D ]&\} |f| } fddt|D fddt|D }t
j|dd}|j}xt D ]|\}}t| \}}|t|	f}|
| dd }tfdd
tt||D rx|
| ||< nt|
| |f||< |||< qW ||j d ksttdd
 |jD r|jd ||< qt| f||< qW ~
~t||}tj||gd}t|||jdS )z7 Compute the rechunk of *x* to the given *chunks*.
    r   )r!   rH   zrechunk-merge-zrechunk-split-c             S   s   g | ]}t |qS r   )r   )r   rD   r   r   r   r      s    z$_compute_rechunk.<locals>.<listcomp>O)rH   c             s   s   | ]}t t|V  qd S )N)r(   r   )r   rD   r   r   r   r3     s    z#_compute_rechunk.<locals>.<genexpr>c                s   g | ]  fd dD qS )c                s   g | ]}|  d  qS )r   r   )r   r@   )r8   r   r   r      s    z/_compute_rechunk.<locals>.<listcomp>.<listcomp>r   )r   )rA   )r8   r   r      s    c                s   g | ]}t t | qS r   )r   rc   )r   r8   )old_block_indicesr   r   r      s   r   Nc             3   s4   | ],\}\}}|j d ko*|j j| | kV  qdS )r   N)r+   stopr!   )r   r8   ZslcZind)r5   r   r   r3   )  s   c             s   s   | ]}|d kV  qdS )r   Nr   )r   dr   r   r   r3   3  s    )Zdependencies)sizer   rM   rH   rC   rB   r!   rJ   r   r   r_   Zndindexnamer   r   r(   Zflatr:   nextr}   r   r`   r   tolisttoolzmerger   Zfrom_collectionsr   )r5   r!   rC   ZcrossedZx2ZintermediatestokenZ
merge_nameZ
split_nameZsplit_name_suffixesZ
old_blocksindexZ	new_indexZnew_idxr#   Zsubdims1Zrec_cat_argZrec_cat_arg_flatZrec_cat_indexZ
ind_slicesZold_block_indexZslicesr   Z	old_indexZlayerZgraphr   )rA   r   r5   r   rQ     sL    



rQ   c               @   s    e Zd Zdd Zdd ZeZdS )_PrettyBlocksc             C   s
   || _ d S )N)blocks)selfr   r   r   r   __init__A  s    z_PrettyBlocks.__init__c             C   s:  g }g }d}x| j D ]}|rh|d |krh|dkr^t|dkr^|d |d d f |dd  }|d7 }q|dkrt|dkst||d |d f g }d}|| qW |r|dkr|d |f n&t|dkst||d |d f g }x<|D ]4\}}|d kr|t| q|d||f  qW d|S )Nr   r   z%d*[%s]z | )r   r   r)   r`   r|   join)r   runsrunZrepeatsrD   partsr   r   r   __str__D  s4    

z_PrettyBlocks.__str__N)__name__
__module____qualname__r   r   __repr__r   r   r   r   r   ?  s    r   c             C   s(   t | trtdd | D s tt| S )z
    Pretty-format *blocks*.

    >>> format_blocks((10, 10, 10))
    3*[10]
    >>> format_blocks((2, 3, 4))
    [2, 3, 4]
    >>> format_blocks((10, 10, 5, 6, 2, 2, 2, 7))
    2*[10] | [5, 6] | 3*[2] | [7]
    c             s   s"   | ]}t |tpt|V  qd S )N)rI   r^   r0   r1   )r   r5   r   r   r   r3   s  s   z format_blocks.<locals>.<genexpr>)rI   r   r}   r`   r   )r   r   r   r   format_blocksg  s    
r   c             C   s    t | tsttdd | D S )zH
    >>> format_chunks((10 * (3,), 3 * (10,)))
    (10*[3], 3*[10])
    c             s   s   | ]}t |V  qd S )N)r   )r   rD   r   r   r   r3   ~  s    z format_chunks.<locals>.<genexpr>)rI   r   r`   )r!   r   r   r   format_chunksx  s    r   c             C   s   dd | D S )zs
    >>> format_plan([((10, 10, 10), (15, 15)), ((30,), (10, 10, 10))])
    [(3*[10], 2*[15]), ([30], 3*[10])]
    c             S   s   g | ]}t |qS r   )r   )r   rD   r   r   r   r      s    zformat_plan.<locals>.<listcomp>r   )Zplanr   r   r   format_plan  s    r   )NN)NN)8__doc__Z
__future__r   r   r   r0   re   	itertoolsr   r   r   operatorr   r	   r
   r   Znumpyr_   r   r   r   baser   Zhighlevelgraphr   Zutilsr   Zcorer   r   r   r   Zwrapr    r   r"   r%   r/   r?   rB   rW   rX   rZ   r]   rb   ro   rx   rz   rO   rQ   objectr   r   r   r   r   r   r   r   <module>   sF   9)
C
4I 
O=(	