B
    T\xI                 @   s  d dl mZmZmZ d dlZd dlmZ d dlZd dlZ	d dl
Zd dlZddlmZ ddlmZmZmZmZmZ ddlmZ ddlmZ d	d
lmZmZ d	dlmZmZmZ d	dlmZ d	dl m!Z! d	dl"m"Z" d	dlm#Z#m$Z$m%Z% edkrd dl&m'Z' nd dl(m'Z' dAddZ)dd Z*dBddZ+dCddZ,dDdd Z-dEd!d"Z.G d#d$ d$e/Z0dFd%d&Z1dGd'd(Z2d)d* Z3d+d, Z4d-d. Z5d/d0 Z6d1d2 Z7d3d4 Z8d5d6 Z9d7d8 Z:d9d: Z;d;d< Z<dHd=d>Z=d?d@ Z>dS )I    )absolute_importdivisionprint_functionN)getitem   )drop_columns)	DataFrameSeries_Frame_concatmap_partitions)hash_pandas_object)PANDAS_VERSION   )baseconfig)tokenizecomputecompute_as_if_collection)delayed)HighLevelGraph)sizeof)digitinsertMz0.20.0)groupsort_indexerFT      ?    Ac	          	      s  t |tr|j| jjkr| S t |tttfr<tdt| |dkrVd}
t	d| j
}n|dkrd| j
}d}
t |ts|| | }n|} dkr|
rt|| \}} | jdd}dd	 |D }nt|\}g }|j||d
 |jdd}dd	 |D }dd	 |D }t|||\}}}tj |||dd\ }}}   t  }|
sR|rt|}t	t|| d}t|| j
}t }y8tjtd|d |d td|d | d  W nH ttfk
r   td|d |d t} fdd	|D  Y nX t |}t |}|t!|kr|t!|krtdd t"|dd |dd D r||d g  t#| || d}|$t%j&S t'| | f|||d|	S )z$ See _Frame.set_index for docstring z~Dask dataframe does not yet support multi-indexes.
You tried to index with this index: %s
Indexes must be single columns only.autoTd   NF)Zoptimize_graphc             S   s   g | ]}t t|qS  )r   r   ).0partr    r    5lib/python3.7/site-packages/dask/dataframe/shuffle.py
<listcomp>:   s    zset_index.<locals>.<listcomp>)upsamplec             S   s   g | ]}|  qS r    )min)r!   ipartr    r    r#   r$   A   s    c             S   s   g | ]}|  qS r    )max)r!   r'   r    r    r#   r$   B   s    r   r   )xZxpfpc                s   g | ]} | qS r    r    )r!   i)	divisionsr    r#   r$   T   s    c             s   s   | ]\}}||k V  qd S )Nr    )r!   ZmxZmnr    r    r#   	<genexpr>Z   s    zset_index.<locals>.<genexpr>)dropr,   )shuffler/   r   )(
isinstancer	   _nameindexr   tuplelistNotImplementedErrorstrr(   npartitionsr   optimizeZ
to_delayedZ_repartition_quantilesr   tolistpdisnullallsummathceilr&   lennpZinterpZlinspace	TypeError
ValueErrorastypeintremove_nanssortedzipset_sorted_indexr   r   
sort_indexset_partition)dfr3   r8   r0   r   r/   r%   r,   Zpartition_sizekwargsZrepartitionZindex2partsZsizesZipartsminsmaxesZempty_dataframe_detectedZtotalnZindexesresultr    )r,   r#   	set_index   sh    


*rT   c             C   s   t | } x:tt| d ddD ]"}t| | r| |d  | |< qW xTtt| d ddD ]<}t| | sZx&t|d t| D ]}| | | |< qW P qZW | S )a   Remove nans from divisions

    These sometime pop up when we call min/max on an empty partition

    Examples
    --------
    >>> remove_nans((np.nan, 1, 2))
    [1, 1, 2]
    >>> remove_nans((1, np.nan, 2))
    [1, 2, 2]
    >>> remove_nans((1, 2, np.nan))
    [1, 2, 2]
    r   r.   r   )r5   rangerA   r;   r<   )r,   r+   jr    r    r#   rG   e   s    rG       c             C   s   t |r4| | jt|tdgd}| j|d}n&|jt|tdgd}| j||d}t|d|t|d ||d}	t |r|	jt	||| j
jd}
n|	jt|j|| j
jd}
||
_|
tjS )	a(   Group DataFrame by index

    Sets a new index and partitions data along that index according to
    divisions.  Divisions are often found by computing approximate quantiles.
    The function ``set_index`` will do both of these steps.

    Parameters
    ----------
    df: DataFrame/Series
        Data that we want to re-partition
    index: string or Series
        Column to become the new index
    divisions: list
        Values to form new divisions between partitions
    drop: bool, default True
        Whether to delete columns to be used as the new index
    shuffle: str (optional)
        Either 'disk' for an on-disk shuffle or 'tasks' to use the task
        scheduling framework.  Use 'disk' if you are on a single machine
        and 'tasks' if you are on a distributed cluster.
    max_branch: int (optional)
        If using the task-based shuffle, the amount of splitting each
        partition undergoes.  Increase this for fewer copies but more
        scheduler overhead.

    See Also
    --------
    set_index
    shuffle
    partd
    r   )r,   meta)_partitions)rY   _indexrY   r   )
max_branchr8   r0   r   )
index_namer/   column_dtype)rB   Zisscalarr   set_partitions_prer;   r	   assignrearrange_by_columnrA   set_index_post_scalarcolumnsdtypeset_index_post_seriesnamer,   r   rK   )rM   r3   r,   r[   r/   r0   r   
partitionsdf2df3df4r    r    r#   rL      s&    !



rL   c       
      C   sx   t |ts| |}|jt|p"| jtdgd}| j|d}| j	j
j|j	j
_t|d||||d}|td| jj}	|	S )a   Group DataFrame by index

    Hash grouping of elements. After this operation all elements that have
    the same index will be in the same partition. Note that this requires
    full dataset read, serialization and shuffle. This is expensive. If
    possible you should avoid shuffles.

    This does not preserve a meaningful index/partitioning scheme. This is not
    deterministic if done in parallel.

    See Also
    --------
    set_index
    set_partition
    shuffle_disk
    shuffle_tasks
    r   )r8   rX   )rY   rY   )r8   r[   r0   r   )r1   r
   Z_select_columns_or_indexr   partitioning_indexr8   r;   r	   r_   _metar3   re   r`   r   rb   rc   )
rM   r3   r0   r8   r[   r   rf   rg   rh   ri   r    r    r#   r0      s    

r0   c       	      C   sX   | | j t|tdgd}| j|d}t|d|t|d |d}| td| jj	}|S )z< Shuffle dataframe so that column separates along divisions r   )r,   rX   )rY   rY   r   )r[   r8   r0   )
r   r^   r;   r	   r_   r`   rA   r   rb   rc   )	rM   columnr,   r[   r0   rf   rg   rh   ri   r    r    r#   rearrange_by_divisions   s    
rm   c             C   sR   |pt dd pd}|dkr,t| |||dS |dkrBt| |||S td| d S )Nr0   Zdisk)r   ZtaskszUnknown shuffle method %s)r   getrearrange_by_column_diskrearrange_by_column_tasksr6   )rM   colr8   r[   r0   r   r    r    r#   r`      s    r`   c               @   s*   e Zd ZdZd
ddZdd Zdd	 ZdS )maybe_buffered_partdzUIf serialized, will return non-buffered partd. Otherwise returns a
    buffered partdTNc             C   s   |pt dd | _|| _d S )NZtemporary_directory)r   rn   tempdirbuffer)selfrt   rs   r    r    r#   __init__   s    zmaybe_buffered_partd.__init__c             C   s    | j rtd| j ffS tdfS d S )NF)F)rs   rr   )ru   r    r    r#   
__reduce__   s    zmaybe_buffered_partd.__reduce__c             O   sP   dd l }| jr|j| jd}n| }| jrB||| |S ||S d S )Nr   )dir)partdrs   ZFilert   ZPandasBlocksZBufferZDict)ru   argsrN   ry   filer    r    r#   __call__  s    zmaybe_buffered_partd.__call__)TN)__name__
__module____qualname____doc__rv   rw   r|   r    r    r    r#   rr      s   
rr   c                s8  dkrj t}t j}d| ft fi}d| fddt D }g }i }	|rt	j
||}
t|g}tt|
|\}}|i}ttt||}n
| d|   tt|fi}d|  fddtD }d	d
  }t	||||}	tj|	|d}
t|
j|S )z Shuffle using local disk Nzzpartd-zshuffle-partition-c                s$   i | ]\}}t | f|fqS r    )shuffle_group_3)r!   r+   key)rl   re   r8   pr    r#   
<dictcomp>  s   z,rearrange_by_column_disk.<locals>.<dictcomp>zbarrier-zshuffle-collect-c                s"   i | ]}t |j f|fqS r    )collectrk   )r!   r+   )barrier_tokenrM   re   r   r    r#   r   1  s   )Nr   )dependencies)r8   r   uuidZuuid1hexrr   	enumerate__dask_keys__r   mergeZdaskrH   r   r   dictrI   appendbarrierr5   rU   toolzfrom_collectionsrk   )rM   rl   r8   r   tokenZalways_new_tokenZdsk1Zdsk2r   ZlayergraphkeysZppvaluesZdsk3Zdsk4r,   r    )r   rl   rM   re   r8   r   r#   ro     s6    


ro   c                s`  |pd}j tttt| dkrLttd  ng }g }g }fddt D t |tfddtD }xtdd D ]xt fddD }tfddtD }	tfd	dD }
|	| |	|	 |	|
 qW tfd
dtD }t
j||f|| |  }tjd |gd}t|d j}|dk	rH|j krHfddt|D }t|| fddt| D }x4t|D ](}td || f|f|d |f< qW tjd ||gd}t|d |dg|d  }n|}dj d  |_|S )z Order divisions of DataFrame so that all values within column align

    This enacts a task-based shuffle

    See also:
        rearrange_by_column_disk
        set_partitions_tasks
        shuffle_tasks
    rW   r   c                s(   g | ]  t  fd dtD qS )c             3   s   | ]}t  |V  qd S )N)r   )r!   rV   )r+   kr    r#   r-   S  s    z7rearrange_by_column_tasks.<locals>.<listcomp>.<genexpr>)r4   rU   )r!   )r   stages)r+   r#   r$   S  s   z-rearrange_by_column_tasks.<locals>.<listcomp>c             3   s:   | ]2\}}d  d|f| j k r* j|fn jfV  qdS )zshuffle-join-r   N)r8   r2   rk   )r!   r+   inp)rM   r   r    r#   r-   X  s   z,rearrange_by_column_tasks.<locals>.<genexpr>c             3   s>   | ]6}d  |ft d d |f d ffV  qdS )zshuffle-group-zshuffle-join-r   N)shuffle_group)r!   r   )rl   r   rR   stager   r    r#   r-   ]  s   c             3   s<   | ]4} D ]*}d  ||ft d |f|ffV  q
qdS )zshuffle-split-zshuffle-group-N)r   )r!   r+   r   )inputsr   r   r    r#   r-   b  s   c             3   s:   | ]2 d   ft  fddtD ffV  qdS )zshuffle-join-c          	      s0   g | ](}d   d  t  d |fqS )zshuffle-split-r   )r   )r!   rV   )r   r   r   r    r#   r$   i  s   z7rearrange_by_column_tasks.<locals>.<genexpr>.<listcomp>N)r   rU   )r!   )r   r   r   )r   r#   r-   g  s   c             3   s,   | ]$\}}d  |fd  |ffV  qdS )zshuffle-zshuffle-join-Nr    )r!   r+   r   )r   r   r    r#   r-   p  s   zshuffle-)r   Nc                s   g | ]}| j  qS r    )r8   )r!   r+   )rM   r    r#   r$   y  s    c                s$   i | ]\}}t | fd  |fqS )zrepartition-group-)shuffle_group_2)r!   r+   r   )rl   r   r    r#   r   |  s   z-rearrange_by_column_tasks.<locals>.<dictcomp>zrepartition-group-zrepartition-get-)N)r8   rF   r?   r@   logrU   r   r   r   r   r   r   r   r   r   r,   r   shuffle_group_get)rM   rl   r[   r8   groupsZsplitsZjoinsstartgroupsplitjoinendZdskr   rg   rO   r   Zgraph2rh   r    )rl   rM   r   r   rR   r   r   r   r#   rp   <  sR    





(rp   c             C   s   t | ddt| S )a~  
    Computes a deterministic index mapping each record to a partition.

    Identical rows are mapped to the same partition.

    Parameters
    ----------
    df : DataFrame/Series/Index
    npartitions : int
        The number of partitions to group into.

    Returns
    -------
    partitions : ndarray
        An array of int64 values mapping each record to a partition.
    F)r3   )r   rF   )rM   r8   r    r    r#   rj     s    rj   c             C   s   t |  dS )Nr   )r5   )rz   r    r    r#   r     s    r   c             C   s   |  |}t|dkr|S |S )z1 Collect partitions from partd, yield dataframes r   )rn   rA   )r   r"   rX   r   Zresr    r    r#   r     s    
r   c             C   s6   t |j| ddd }t|d || |d kj< |S )Nright)Zsider   r   r.   )r;   r	   ZsearchsortedrA   r   )sr,   rf   r    r    r#   r^     s    r^   c                s   t | si | fS | | jtj}| d }t|tj|\}}| | |	 } fddt
|d d |dd  D }tt
t||}|| jd d fS )Nr   c                s   g | ]\}} j || qS r    )iloc)r!   ab)rg   r    r#   r$     s    z#shuffle_group_2.<locals>.<listcomp>r.   r   )rA   _valuesrE   rB   int64r(   r   ZviewtakecumsumrI   r   rU   r   )rM   rq   indrR   indexer	locationsrO   Zresult2r    )rg   r#   r     s    
(r   c             C   s    | \}}||kr|| S |S d S )Nr    )Zg_headr+   gheadr    r    r#   r     s    r   c                s   |dkr| | }nt | | dd}|j}t|d }t||j|dd}tj||| |d tj|||d t|tj|\}}	| 	| |	
 }	 fddt|	d	d
 |	dd	 D }
ttt||
S )z Splits dataframe into groups

    The group is determined by their final partition, and which stage we are in
    in the shuffle
    rY   F)r3   r   )copy)outc                s   g | ]\}} j || qS r    )r   )r!   r   r   )rg   r    r#   r$     s    z!shuffle_group.<locals>.<listcomp>Nr.   r   )r   r   rB   Zmin_scalar_typemodrE   Zfloor_divider   r   r   r   rI   r   rU   )rM   rq   r   r   r8   r   ctypr   r   rO   r    )rg   r#   r     s    

(r   c                s0   |  |  fdd jD }|j|dd d S )Nc                s   i | ]}  ||qS r    )Z	get_group)r!   r+   )r   r    r#   r     s    z#shuffle_group_3.<locals>.<dictcomp>T)fsync)groupbyr   r   )rM   rq   r8   r   dr    )r   r#   r     s    
r   c             C   s*   | j dddj||d}|j||_|S )NrY   r   )axis)r/   )r/   rT   rb   rE   )rM   r\   r/   r]   rg   r    r    r#   ra     s    ra   c             C   s2   | j dddjddd}||j_|j||_|S )NrY   r   )r   rZ   T)r/   )r/   rT   r3   re   rb   rE   )rM   r\   r/   r]   rg   r    r    r#   rd     s    rd   c             K   s   t |ts| jj||d}n| jj|j|d}ttj| |||d}|sTt|f|}nt|t| jkrrd}t	|t
||_|S )N)r/   )r/   rX   ae  When doing `df.set_index(col, sorted=True, divisions=...)`, divisions indicates known splits in the index column. In this case divisions must be the same length as the existing divisions in `df`

If the intent is to repartition into new divisions after setting the index, you probably want:

`df.set_index(col, sorted=True).repartition(divisions=divisions)`)r1   r	   rk   rT   r   r   compute_divisionsrA   r,   rD   r4   )rM   r3   r/   r,   rN   rX   rS   msgr    r    r#   rJ     s    

rJ   c             K   s   | j jtj| j d}| j jtj| j d}t||f|\}}t|t|ksrt|t|ksrtdd t	||D r~t
d||t|t|d f }|S )N)rX   c             s   s   | ]\}}||kV  qd S )Nr    )r!   r   r   r    r    r#   r-     s    z$compute_divisions.<locals>.<genexpr>z2Partitions must be sorted ascending with the indexr.   )r3   r   r   r&   r(   r   rH   r5   anyrI   rD   r4   )rM   rN   rP   rQ   r,   r    r    r#   r     s    r   )NNFTr   Nr   )rW   TNN)NNrW   N)NN)NNNN)NF)rW   N)TN)?Z
__future__r   r   r   r?   operatorr   r   ZnumpyrB   Zpandasr;   r   methodsr   Zcorer   r	   r
   r   r   Zhashingr   Zutilsr    r   r   r   r   r   r   Zhighlevelgraphr   r   r   r   r   Zpandas._libs.algosr   Zpandas.algosrT   rG   rL   r0   rm   r`   objectrr   ro   rp   rj   r   r   r^   r   r   r   r   ra   rd   rJ   r   r    r    r    r#   <module>   sZ     
F 
; 
!
 


,
T
