B
    T\3                 @   s  d dl mZmZmZ d dlZd dlmZ d dlZd dlZd dl	Z	d dl
mZ d dlZd dlZddlmZmZ ddlmZmZmZ dd	lmZ dd
lmZ ddlmZmZ ddlmZ ddlm Z  ddl!m"Z" ddlm#Z#m$Z$ ddlm%Z%m&Z& dZ'dd Z(dd Z)dIddZ*dd Z+dJddZ,dd Z-dd  Z.d!d" Z/d#d$ Z0d%d& Z1dKd(d)Z2d*d+ Z3dLd,d-Z4d.d/ Z5d0d1 Z6d2d3 Z7d4d5d6d7d8d9d:d;hZ8d5d8d9hZ9dMd<d=Z:dNd>d?Z;i Z<d@dA Z=dOdCdDZ>dPdGdHZ?ere?j@ej?_@dS )Q    )absolute_importdivisionprint_functionN)OrderedDict)LooseVersion   )	DataFrameSeries)clear_known_categoriesstrip_unknown_categoriesUNKNOWN_CATEGORIES   )compress)tokenize)PY3string_types)delayed)get_fs_token_paths)infer_storage_options)import_requirednatural_sort_key   )_get_pyarrow_dtypes_meta_from_dtypes)read_parquet
to_parquetc                s   | d }t d}dd | d D }g }x0|D ](\}}|rH||rHd}|||f q.W dd |D }| dd	dig}d
d |D }|st|}t|  fdd|D }	ndd |D }	t|}
||	|
|fS )aP  Get the set of names from the pandas metadata section

    Parameters
    ----------
    pandas_metadata : dict
        Should conform to the pandas parquet metadata spec

    Returns
    -------
    index_names : list
        List of strings indicating the actual index names
    column_names : list
        List of strings indicating the actual column names
    storage_name_mapping : dict
        Pairs of storage names (e.g. the field names for
        PyArrow) and actual names. The storage and field names will
        differ for index names for certain writers (pyarrow > 0.8).
    column_indexes_names : list
        The names for ``df.columns.name`` or ``df.columns.names`` for
        a MultiIndex in the columns

    Notes
    -----
    This should support metadata written by at least

    * fastparquet>=0.1.3
    * pyarrow>=0.7.0
    Zindex_columnsz__index_level_\d+__c             S   s$   g | ]}| d |d |d fqS )Z
field_namename)get).0x r    8lib/python3.7/site-packages/dask/dataframe/io/parquet.py
<listcomp>>   s   z*_parse_pandas_metadata.<locals>.<listcomp>columnsNc             S   s   g | ]\}}||kr|qS r    r    )r   storage_namer   r    r    r!   r"   I   s    Zcolumn_indexesr   c             S   s   g | ]}|d  qS )r   r    )r   r   r    r    r!   r"   P   s    c                s   g | ]\}}| kr|qS r    r    )r   r$   r   )index_storage_names2r    r!   r"   \   s    c             S   s   g | ]\}}||kr|qS r    r    )r   r$   r   r    r    r!   r"   b   s    )recompilematchappendr   listsetdict)pandas_metadataZindex_storage_namesZindex_name_xprZpairsZpairs2r$   Z	real_nameindex_namescolumn_index_namescolumn_namesstorage_name_mappingr    )r%   r!   _parse_pandas_metadata   s(    


r2   c                s   | dk	}|dk	}t }| dkr&t|} nt| tr<| g} t}nt| } |dkrR|}n0|dkrhg }|| }nt|trz|g}nt|}|r|s|fdd|D  nP|r|s|   fdd|D n0|r|r|  |t rtdn| | |fS )am  Normalize user and file-provided column and index names

    Parameters
    ----------
    user_columns : None, str or list of str
    data_columns : list of str
    user_index : None, str, or list of str
    data_index : list of str

    Returns
    -------
    column_names : list of str
    index_names : list of str
    out_type : {pd.Series, pd.DataFrame}
    NFc                s   g | ]}| kr|qS r    r    )r   r   )r.   r    r!   r"      s    z,_normalize_index_columns.<locals>.<listcomp>c                s   g | ]}| kr|qS r    r    )r   r   )r0   r    r!   r"      s    z3Specified index and column names must not intersect)r   r*   
isinstancer   r	   r+   intersection
ValueError)Zuser_columnsZdata_columnsZ
user_indexZ
data_indexZspecified_columnsZspecified_indexout_typer    )r0   r.   r!   _normalize_index_columnsj   s>    




r7   c                s\  dd l t|jjr|nt|dkr\|dk	rFj|jjdqt||||dS nTy&j|d j d jjdW n, tk
r   j|d jjdY nX t	j
jd dkr|dkrtd	t||\}}	 	fd
djD }
dt||   	fddt|
D }|shdf|i}d}|||S r|dk	r|jj}	yj}W n" tk
r   j}Y nX |	|kr||	 }|d |d d g }n*|dkrtdj|	ddt|
d  }n"|dkrtddt|
d  }t|d tjrNdd |D }|||S )Nr   r   F)	open_withsep)r#   
categoriesindex	_metadataTzzinfer_divisions=True is not supported by the fastparquet engine for datasets that do not contain a global '_metadata' filec                s0   g | ](} j |js j |s|qS r    )apiZfilter_out_statsschemaZfilter_out_cats)r   rg)fastparquetfilterspfr    r!   r"      s    z%_read_fastparquet.<locals>.<listcomp>zread-parquet-c                sF   i | ]>\}}t | |tkjjjjf|fqS r    )_read_parquet_row_groupZrow_group_filenamer	   r?   catsdtypesfile_scheme)r   ir@   )all_columnsr:   fsr.   r   r6   rC   r1   r    r!   
<dictcomp>   s   z%_read_fastparquet.<locals>.<dictcomp>)NNminmaxzlUnable to infer divisions for index of '{index_name}' because it is not known to be sorted across partitions)
index_name)NzDUnable to infer divisions for because no index column was discoveredc             S   s   g | ]}t |qS r    )pd	Timestamp)r   dr    r    r!   r"      s    )rA   r3   r>   ParquetFilelenopenr9   _read_fp_multifile	ExceptionospathsplitfnNotImplementedError_pf_validation
row_groupsr   	enumerater;   r   sorted_partitioned_columns	TypeErrorr5   formatnpZ
datetime64)rJ   fs_tokenpathsr#   rB   r:   r;   infer_divisionsmetarN   rgsdsk	divisionsminmaxr    )
rI   r:   rA   rB   rJ   r.   r   r6   rC   r1   r!   _read_fastparquet   s`    




rk   c                s  ddl m} || j| || jt| jp(g  | t|trDt|}| jjr`dd | jjD }ng }t	|dkr| 
 }t|ts|g}| jt| j  dd  D n<t	|dkrtt|d \} } | j ntd|d	krg }t| ||\ }}	|d	kr| j}nt|tr*|g}nt|}t }
|
 fd
d|D  | |}fdd| D }t|
||d	g}t	|dkrtdnt	|dkrd	}x8|D ]0}||krtjtjg tgd|jd||< qW xZ| jD ]P}||jkr|| j| j| ||< n"|jj|kr|j| j| |_qW |	tkrnt	|jdks`t||jd  }||||	|
|fS )aV  Validate user options against metadata in dataset

     columns, index and categories must be in the list of columns available
     (both data columns and path-based partitioning - subject to possible
     renaming, if pandas metadata is present). The output index will
     be inferred from any available pandas metadata, if not given.
     r   )check_column_namesc             S   s   g | ]}|j d kr|jqS )pandas)keyvalue)r   r   r    r    r!   r"     s    z"_pf_validation.<locals>.<listcomp>c             S   s   i | ]
}||qS r    r    )r   kr    r    r!   rK     s    z"_pf_validation.<locals>.<dictcomp>r   z/File has multiple entries for 'pandas' metadataNc             3   s   | ]}| kr|V  qd S )Nr    )r   r   )r0   r    r!   	<genexpr>6  s    z!_pf_validation.<locals>.<genexpr>c                s   i | ]\}}|  ||qS r    )r   )r   rp   v)r1   r    r!   rK   9  s    z&Cannot read DataFrame with MultiIndex.)r:   )r;   )fastparquet.utilrl   r#   r*   rE   r3   tuplefmdZkey_value_metadatarS   Z
_get_indexr2   jsonloadsextendr5   r7   r:   r   Z_dtypesitemsr   rO   r	   ZCategoricalr   r;   catZset_categoriesr   AssertionError)rC   r#   r;   r:   rB   rl   Z	pandas_mdr.   r/   r6   rI   rF   rf   rz   Zcatcolr    )r0   r1   r!   r\     sf    








r\   c          	      s   ddl m} ddlm}m}m ||\}	fdd|D }
||	}||d jd|_t|	|_	t
||g \}}} 	dt||   	f	dd	t|
D }d
t|d  }|||S )zBRead dataset with fastparquet by assuming metadata from first filer   )rR   )analyse_pathsget_file_scheme	join_pathc                s   g | ]} |qS r    r    )r   p)r~   r    r!   r"   \  s    z&_read_fp_multifile.<locals>.<listcomp>)r8   zread-parquet-c                s8   i | ]0\}}t | tkjjf|fqS r    )_read_pf_simpler	   rE   rG   )r   rH   rX   )	rI   baser:   rJ   r.   r   r6   rC   r1   r    r!   rK   f  s   z&_read_fp_multifile.<locals>.<dictcomp>)Nr   )rA   rR   rs   r|   r}   r~   rT   rG   _paths_to_catsrE   r\   r   r^   rS   )rJ   rc   rd   r#   r:   r;   rR   r|   r}   ZfnsZparsed_pathsschemerf   _rN   rh   ri   r    )
rI   r   r:   rJ   r.   r~   r   r6   rC   r1   r!   rU   V  s     
rU   c
                s   ddl m}
 |
|| jd}||dd}x$|jD ]}x|jD ]
}||_q@W q4W ||_||_	||_
|j|| d}|jjdkr r d  d |j_n rfdd	 D |j_ fd
d	|D |_|r||jd  S |S dS )z9Read dataset with fastparquet using ParquetFile machineryr   )rR   )r8    /)r;   r   c                s   g | ]}  ||qS r    )r   )r   r   )r1   r    r!   r"     s   z#_read_pf_simple.<locals>.<listcomp>c                s$   g | ]}| pg kr ||qS r    )r   )r   col)r.   r1   r    r!   r"     s   N)rA   rR   rT   replacelstripr]   r#   	file_pathrG   rE   rZ   	to_pandasr;   nlevelsr   r   names)rJ   rX   r   r.   rI   	is_seriesr:   rE   r   r1   rR   rC   relpathr@   chdfr    )r.   r1   r!   r   o  s,    


r   c             C   s  ddl m}m}m} t }t }x| D ]}|d}|dkr||}	x|	D ]4\}
}||
t || ||
t | qJW q&xVt	|
ddd D ]<\}}d| }
||
t || ||
t | qW q&W x| D ]\}
}||
 }t|t|kr\t }x*||
 D ]}|||t | qW dd	 | D }td
| ||}t|dkrddl}dd	 | D }|d|  qW dd | D S )z2Extract out fields and labels from directory namesr   )ex_from_sep
val_to_numgroupby_typesr   hiveNr=   zdir%ic             S   s&   g | ]}t |d kr|D ]}|qqS )r   )rS   )r   rp   cr    r    r!   r"     s    z"_paths_to_cats.<locals>.<listcomp>z)Partition names map to the same value: %sr   c             S   s   g | ]}|d  qS )r   r    )r   r   r    r    r!   r"     s    z<Partition names coerce to values of different types, e.g. %sc             S   s   i | ]\}}t ||qS r    )r*   )r   rp   rr   r    r    r!   rK     s    z"_paths_to_cats.<locals>.<dictcomp>)rs   r   r   r   r   findall
setdefaultr+   addr^   rY   ry   rS   valuesr5   warningswarn)rd   r   r   r   r   rE   Zraw_catsrX   s
partitionsrn   valrH   rr   rawZconflicts_by_valueZraw_valZ	conflictsZvals_by_typer   Zexamplesr    r    r!   r     s>    

 
r   c                sh  ddl m} ddlm} dd  D t ttfsB g d}r^\ kr^ g  fdd D  | fd	d| D }||| j	d
}|	|_
x2|jD ](}x"|jD ]}||dd|_qW qW ||_|j |d}|jjdkrr6|j_nr6fddD |j_fdd D |_|r`||jd  S |S dS )z9Read a single file with fastparquet, to be used in a taskr   )rR   )r   c             S   s   i | ]\}}||qS r    r    )r   rp   rr   r    r    r!   rK     s    z&_read_parquet_file.<locals>.<dictcomp>Tc                s   g | ]}  ||qS r    )r   )r   r   )name_storage_mappingr    r!   r"     s    z&_read_parquet_file.<locals>.<listcomp>c                s    g | ]\}}| kr||fqS r    r    )r   rp   rr   )r#   r    r!   r"     s    )r8   r   r   )r#   r;   r:   r   c                s   g | ]}  ||qS r    )r   )r   r   )r1   r    r!   r"     s   c                s    g | ]}| kr ||qS r    )r   )r   r   )r;   r1   r    r!   r"     s   N)fastparquet.apirR   collectionsr   ry   r3   rt   r*   r   rT   rG   r]   r#   r   r   r   rZ   r   r;   r   r   r   )rJ   r   rZ   r;   r#   seriesr:   csdtr   r1   argsrR   r   rC   r@   r   r   r    )r#   r;   r   r1   r!   _read_parquet_file  s>    


r   c                sF  ddl m} ddlm} ddlm} dd  D t tt	fsN g d}rj\ krj g  fdd	 D  
| fd
d	| D }||j |||	\}}||| |||| j||
d	 |jjdkrr
|j_nrfdd	D |j_fdd	 D |_|r>||jd  S |S d S )Nr   )_pre_allocate)read_row_group_file)r   c             S   s   i | ]\}}||qS r    r    )r   rp   rr   r    r    r!   rK     s    z+_read_parquet_row_group.<locals>.<dictcomp>Tc                s   g | ]}  ||qS r    )r   )r   r   )r   r    r!   r"     s    z+_read_parquet_row_group.<locals>.<listcomp>c                s    g | ]\}}| kr||fqS r    r    )r   rp   rr   )r#   r    r!   r"     s    )rT   Zassignr   r   c                s   g | ]}  ||qS r    )r   )r   r   )r1   r    r!   r"     s   c                s    g | ]}| kr ||qS r    )r   )r   r   )r;   r1   r    r!   r"     s   )r   r   Zfastparquet.corer   r   r   ry   r3   rt   r*   r   num_rowsrT   r;   r   r   r   r#   )rJ   rZ   r;   r#   r@   r   r:   r?   r   r   r   r1   r   r   r   r   r   Zviewsr    )r#   r;   r   r1   r!   rD     s8    


rD   c          
   C   s   ddl m}m} dd l}	t|}t| s0d }
n|r~t|	jdkr^|| ||||||j|j	}
q|| |||||j
||j|j		}
nBt| |_||j
||gd}||| |j||d}
W d Q R X |
S )Nr   )partition_on_columnsmake_part_filez0.1.4wb)compressionru   )Zfastparquet.writerr   r   rA   copyrS   r   __version__rT   mkdirsr9   r   joinr?   )r   rJ   rX   filenameru   r   partition_onr   r   rA   rg   filr    r    r!   _write_partition_fastparquet  s     

r   Fc	          	      s`  dd l }
 j}|	dd}|dksBt|trJd| krJtd| j}|dksf|d kr|| j	r|| 
 } | jd g}nd}g }|ry|
jjj|d}W n ttfk
r   d}Y nX |r|jd	krtd
nt|jt| jt ks
tt|jkr$td|jt| jn\t|jj|j | |j jk rrtdt|j t| j A n| |j  } |j|
jj|s |
j |}||d  d d }|d |k r td||d n"|
jj!| j"f||d|	dfddt#| j$D }t%t&dd fddt'|| ( D }t%t)|||S )Nr   object_encodingutf8ZinferzM"infer" not allowed as object encoding, because this required data in memory.T)r8   r9   F)r   emptyZflatz?Requested file scheme is hive, but existing file scheme is not.z5Appended columns not the same.
Previous: {} | New: {}zAppended dtypes differ.
{}rM   r=   zMAppended divisions overlapping with the previous ones.
Previous: {} | New: {})r   
index_colsZignore_columnsc                s   g | ]}d |   qS )zpart.%i.parquetr    )r   rH   )i_offsetr    r!   r"   d  s   z&_write_fastparquet.<locals>.<listcomp>)purec          
      s$   g | ]\}}|| qS r    r    )r   r   part)r   ru   rJ   r   rX   writer    r!   r"   h  s   )*rA   r   r9   popr3   r,   r   r5   ri   known_divisionsreset_indexr#   r>   rR   rT   IOErrorrG   r+   rE   ra   r*   rO   r	   rF   Zlocanyry   Z	iteritemsru   writerZfind_max_partr]   r_   Zmake_metadataZ_metarangeZnpartitionsr   r   zip
to_delayed_write_metadata)r   rJ   rc   rX   write_indexr)   ignore_divisionsr   r   kwargsrA   r9   r   ri   r   rC   rj   Zold_end	filenameswritesr    )r   ru   rJ   r   r   rX   r   r!   _write_fastparquet(  sb    



2&
r   c             C   s   ddl }t|}xft|| D ]X\}}|dk	rt|trTx<|D ]}	|j|	 q>W qx|jD ]
}
||
_q\W |j| qW |	|dg}|j
j|||jdd |	|dg}|j
j|||jd dS )zc Write Parquet metadata after writing all row groups

    See Also
    --------
    to_parquet
    r   Nr<   F)r8   Zno_row_groups_common_metadata)r8   )rA   r   r   r3   r*   r]   r)   r#   r   r   r   Zwrite_common_metadatarT   )r   r   ru   rX   rJ   r9   rA   rZ   r@   rchunkr    r    r!   r   n  s    



r   c                s  ddl m} dd lm t tr* g n d kr8g  nt  t|trRt|}j|||dj	d k	rdd j	j
D }	ng }	j }
|
jd k	od|
jk}|rt|
jd d}t|\}ng |
jd	d
 D d g}fdd|	D 7 t||\ }g }fdd}x0jD ]&}||}|jdkr<|| q<W t|dkrtjdkrt|dd d}tdkr·fdd D }|r|d }nd }nd }t|||
|}t|
 }fdd
| D }t|||}t| d}tkrFt|j dks8t!||j d  }dt"||| |r fdd
t#|D }nt$|}df|i}|||S )Nr   )get_pyarrow_filesystemr   )
filesystemrB   c             S   s   g | ]}|d k	r|qS )Nr    )r   nr    r    r!   r"     s    z!_read_pyarrow.<locals>.<listcomp>s   pandasr   c             S   s   i | ]
}||qS r    r    )r   rp   r    r    r!   rK     s    z!_read_pyarrow.<locals>.<dictcomp>c                s   g | ]}| kr|qS r    r    )r   r   )r0   r    r!   r"     s    c                s     j| ddS )Nrb)mode)rR   rT   )rZ   )rJ   pqr    r!   <lambda>  s    z_read_pyarrow.<locals>.<lambda>r   c             S   s
   t | jS )N)r   rX   )piecer    r    r!   r     s    )rn   c                s    g | ]\}} d  |kr|qS )r   r    )r   r$   r   )r.   r    r!   r"     s    c                s   i | ]\}}|  ||qS r    )r   )r   rp   rr   )r1   r    r!   rK     s    )Zcolszread-parquet-c          
      s0   i | ](\}}t |tkj f|fqS r    )_read_pyarrow_parquet_piecer	   r   )r   rH   r   )r:   r0   datasetrJ   r.   r6   	task_namer    r!   rK     s   	)%
bytes.corer   pyarrow.parquetparquetr3   r   r*   rt   ZParquetDatasetr   Zpartition_namesr?   Zto_arrow_schemaZmetadatarv   rw   decoder2   r   r7   piecesget_metadataZnum_row_groupsr)   rS   sortedry   _get_pyarrow_divisionsr   r   r
   r	   r#   r{   r   r^   r   )rJ   rc   rd   r#   rB   r:   r;   re   r   r   r?   Zhas_pandas_metadatar-   r/   rI   Znon_empty_piecesZ_openr   rC   Zdivisions_namesdivisions_nameri   rF   rf   Z	task_planr    )	r:   r0   r   rJ   r.   r6   r   r1   r   r!   _read_pyarrow  sx    








	r   c             C   sV   t dt dt ddd}y||}W n$ tk
rL   tdj|dY nX | | S )a  
    Convert an input time in the specified units to nanoseconds

    Parameters
    ----------
    val: int
        Input time value
    unit : str
        Time units of `val`.
        One of 's', 'ms', 'us', 'ns'

    Returns
    -------
    int
        Time val in nanoseconds
    g    eAg    .Ag     @@r   )r   ZmsusnszUnsupported time unit '{unit}')unit)intr   KeyErrorr5   ra   )r   r   ZfactorsZfactorr    r    r!   _to_ns  s    r   c          	      s  ddl }ddlm} |dkr2|jtdk r2td|dkrj|dk	oN||dk}|dkrn|dkrntdnd}| r|rg }d}x| D ]}	|	|j	}
|

dfdd	tjD }y||}W n tk
r   d}P Y nX |}|j}|jr,|dks||jk r,||j|jf |j}qd}P qW |rd
d	 |D |d d g }||}|j|jr|jjfdd	|D }dd	 |D }|j| krd  fdd	|D }n*|dkrtdj|ddt| d  }n| rdt| d  }nd}|S )a{  
    Compute DataFrame divisions from a list of pyarrow dataset pieces

    Parameters
    ----------
    pa_pieces : list[pyarrow.parquet.ParquetDatasetPiece]
        List of dataset pieces. Each piece corresponds to a single partition in the eventual dask DataFrame
    divisions_name : str|None
        The name of the column to compute divisions for
    pa_schema : pyarrow.lib.Schema
        The pyarrow schema for the dataset
    infer_divisions : bool or None
        If True divisions must be inferred (otherwise an exception is raised). If False or None divisions are not
        inferred
    Returns
    -------
    list
    r   NTz0.9.0z-infer_divisions=True requires pyarrow >=0.9.0FzDUnable to infer divisions for because no index column was discoveredc                s   g | ]}  |jqS r    )columnZpath_in_schema)r   rH   )r@   r    r!   r"   M  s    z*_get_pyarrow_divisions.<locals>.<listcomp>c             S   s   g | ]\}}|qS r    r    )r   ZmnZmxr    r    r!   r"   a  s    r=   r   c                s   g | ]}t | qS r    )r   )r   rQ   )	time_unitr    r!   r"   g  s    c             S   s   g | ]}t |qS r    )rO   rP   )r   r   r    r    r!   r"   i  s    zutf-8c                s   g | ]}|   qS r    )r   strip)r   rQ   )encodingr    r!   r"   o  s    zlUnable to infer divisions for index of '{index_name}' because it is not known to be sorted across partitions)rN   )N)NN)pyarrowr   r   r   r   r[   Zget_field_indexr5   r   rR   Z	row_groupr   Znum_columnsr;   r   Z
statisticsZhas_min_maxrL   r)   rM   Zfield_by_nametypesZis_timestamptyper   stringra   rS   )Z	pa_piecesr   Z	pa_schemare   par   Zdivisions_name_in_schemaZmin_maxsZlast_maxr   rC   Zrg_pathsZdivisions_col_indexZcol_metaZstatsri   Zindex_fieldZdivisions_nsr    )r   r@   r   r!   r     sb    






r   c          	   C   sD  dd l }| j|jdd}|j|| |d|d}	W d Q R X |jtdk rr|	 }
xJ|D ]}|
| d|
|< qVW n*|jtdk r|	j|d	}
n|	j|d
d}
t|
j	t
j }|s|r|
|}
nd|r$|
j	j|kr$|
jd
d}
|r|
|}
tt|
j|}|r|
j|dd}
|
j|d
d}
|r8|
|
jd  S |
| S d S )Nr   r   )r   T)r#   r   Zuse_pandas_metadatafilez0.9.0categoryz0.11.0)r:   F)r:   Zdate_as_object)dropr   )Zaxis)r#   r   )r   rT   rX   readr   r   r   Zastyper3   r;   rO   Z
RangeIndexZ	set_indexr   r   r*   r+   r#   
differencer   Zreindex)rJ   r   r#   r   r   r   r:   r   ftabler   rz   Z	has_indexr   r    r    r!   r     s6    


r   Zrow_group_sizeversionZuse_dictionaryr   Zuse_deprecated_int96_timestampsZcoerce_timestampsZflavorZ
chunk_sizec                s   |rt d|rt dttrHddttt  }	t|	d krZ| jrZd j	dgt
tdd  j	d	g d
<  fddt|  D }
t
|
S )Nz/`append` not implemented for `engine='pyarrow'`z9`ignore_divisions` not implemented for `engine='pyarrow'`zUnexpected keyword arguments: z%rTzpart.%i.parquetF)r   r   metadata_pathc          	      s2   g | ]*\}}|| f|r(n qS r    r    )r   rH   r   )first_kwargsrJ   r   r   rX   templater   r   r    r!   r"     s   z"_write_pyarrow.<locals>.<listcomp>)r[   r+   r   _pyarrow_write_table_kwargsr*   r`   r   r   r9   r   r   _write_partition_pyarrowr   r^   r   )r   rJ   rc   rX   r   r)   r   r   r   msgr   r    )r   rJ   r   r   rX   r  r   r   r!   _write_pyarrow  s$    
r  c          	   K   s   dd l }ddl m}	 |jj| |d}
|rF|	j|
|f|||d| n(||d}|	j|
|f| W d Q R X |d k	r||d*}dd | D }|	j|
j	|f| W d Q R X d S )Nr   )r   )preserve_index)Zpartition_colsr  r   r   c             S   s   i | ]\}}|t kr||qS r    )_pyarrow_write_metadata_kwargs)r   rp   rr   r    r    r!   rK     s    z,_write_partition_pyarrow.<locals>.<dictcomp>)
r   r   ZTableZfrom_pandasZwrite_to_datasetrT   Zwrite_tablery   Zwrite_metadatar?   )r   rX   rJ   r   r   r   r   r   r   r   tr   Zkwargs_metar    r    r!   r    s    r  c          	   C   s   | t krt |  S | dkrNxdD ]"}yt|S  tk
r>   Y qX qW tdnx| dkrvtdd ttd t d< }|S | dkrtdd}t|jd	k rtd
tt	d t d< }|S t
d| d dS )a8  Get the parquet engine backend implementation.

    Parameters
    ----------
    engine : {'auto', 'fastparquet', 'pyarrow'}, default 'auto'
        Parquet reader library to use. Default is first installed in this list.

    Returns
    -------
    A dict containing a ``'read'`` and ``'write'`` function.
    auto)rA   r   z,Please install either fastparquet or pyarrowrA   z`fastparquet` not installed)r   r   r   z`pyarrow` not installedz0.8.0z!PyArrow version >= 0.8.0 requiredzUnsupported engine: "{0}".z4  Valid choices include "pyarrow" and "fastparquet".N)_ENGINES
get_engineRuntimeErrorr   rk   r   r   r   r   r  r5   ra   )engineZengr   r    r    r!   r    s.    





r  r	  c          
   C   s"  d}yXddl }	t| |	jjrZ| j|	jjkrFtd| j	sFt
d| j	 |dksVt
dd}W n tk
rp   Y nX |rtd	d
 }
| j	dr| j	dtd  }n| j	}t|d|d\}}}| }nFt|d
 }
t| d|d\}}}t| tr
t|dkr
t|td}|
||||||||dS )a  
    Read ParquetFile into a Dask DataFrame

    This reads a directory of Parquet data into a Dask.dataframe, one file per
    partition.  It selects the index among the sorted columns if any exist.

    Parameters
    ----------
    path : string, list or fastparquet.ParquetFile
        Source directory for data, or path(s) to individual parquet files.
        Prefix with a protocol like ``s3://`` to read from alternative
        filesystems. To read from multiple files you can pass a globstring or a
        list of paths, with the caveat that they must all have the same
        protocol.
        Alternatively, also accepts a previously opened
        fastparquet.ParquetFile()
    columns : string, list or None (default)
        Field name(s) to read in as columns in the output. By default all
        non-index fields will be read (as determined by the pandas parquet
        metadata, if present). Provide a single field name instead of a list to
        read in the data as a Series.
    filters : list
        List of filters to apply, like ``[('x', '>', 0), ...]``. This implements
        row-group (partition) -level filtering only, i.e., to prevent the
        loading of some chunks of the data, and only if relevant statistics
        have been included in the metadata.
    index : string, list, False or None (default)
        Field name(s) to use as the output frame index. By default will be
        inferred from the pandas parquet file metadata (if present). Use False
        to read all fields as columns.
    categories : list, dict or None
        For any fields listed here, if the parquet encoding is Dictionary,
        the column will be created with dtype category. Use only if it is
        guaranteed that the column is encoded as dictionary in all row-groups.
        If a list, assumes up to 2**16-1 labels; if a dict, specify the number
        of labels expected; if None, will load categories automatically for
        data written by dask/fastparquet, not otherwise.
    storage_options : dict
        Key/value pairs to be passed on to the file-system backend, if any.
    engine : {'auto', 'fastparquet', 'pyarrow'}, default 'auto'
        Parquet reader library to use. If only one library is installed, it
        will use that one; if both, it will use 'fastparquet'
    infer_divisions : bool or None (default).
        By default, divisions are inferred if the read `engine` supports
        doing so efficiently and the `index` of the underlying dataset is
        sorted across the individual parquet files. Set to ``True`` to
        force divisions to be inferred in all cases. Note that this may
        require reading metadata from each file in the dataset, which may
        be expensive. Set to ``False`` to never infer divisions.

    Examples
    --------
    >>> df = dd.read_parquet('s3://bucket/my-parquet-data')  # doctest: +SKIP

    See Also
    --------
    to_parquet
    Fr   Nz.*://zxParquetFile: Path must contain protocol (e.g., s3://...) when using other than the default LocalFileSystem. Path given: )r	  rA   z['engine' should be set to 'auto' or 'fastparquet' when reading from fastparquet.ParquetFileTrA   r   r<   r   )r   storage_optionsr   )rn   )r#   rB   r:   r;   re   )rA   r3   r>   rR   rT   utilZdefault_openr&   r(   rZ   r{   ImportErrorr  endswithrS   r   r   r   r   )rX   r#   rB   r:   r;   r  r  re   Zis_ParquetFilerA   r   ZurlpathrJ   rc   rd   r    r    r!   r     s<    <
r   defaultTc
             K   s   |pg }t |t | j r"td|dkr4||
d< ndtkrDd|
d< t|d }t|d|d\}}}t|d }|| |||f||||d	|
}|	r|  d
S |S )a  Store Dask.dataframe to Parquet files

    Notes
    -----
    Each partition will be written to a separate file.

    Parameters
    ----------
    df : dask.dataframe.DataFrame
    path : string
        Destination directory for data.  Prepend with protocol like ``s3://``
        or ``hdfs://`` for remote data.
    engine : {'auto', 'fastparquet', 'pyarrow'}, default 'auto'
        Parquet library to use. If only one library is installed, it will use
        that one; if both, it will use 'fastparquet'.
    compression : string or dict, optional
        Either a string like ``"snappy"`` or a dictionary mapping column names
        to compressors like ``{"name": "gzip", "values": "snappy"}``. The
        default is ``"default"``, which uses the default compression for
        whichever engine is selected.
    write_index : boolean, optional
        Whether or not to write the index. Defaults to True *if* divisions are
        known.
    append : bool, optional
        If False (default), construct data-set from scratch. If True, add new
        row-group(s) to an existing data-set. In the latter case, the data-set
        must exist, and the schema must match the input data.
    ignore_divisions : bool, optional
        If False (default) raises error when previous divisions overlap with
        the new appended divisions. Ignored if append=False.
    partition_on : list, optional
        Construct directory-based partitioning by splitting on these fields'
        values. Each dask partition will result in one or more datafiles,
        there will be no global groupby.
    storage_options : dict, optional
        Key/value pairs to be passed on to the file-system backend, if any.
    compute : bool, optional
        If True (default) then the result is computed immediately. If False
        then a ``dask.delayed`` object is returned for future computation.
    **kwargs
        Extra options to be passed on to the specific backend.

    Examples
    --------
    >>> df = dd.read_csv(...)  # doctest: +SKIP
    >>> to_parquet('/path/to/output/', df, compression='snappy')  # doctest: +SKIP

    See Also
    --------
    read_parquet: Read parquet data to dask.dataframe
    z#Partitioning on non-existent columnr  r   Zsnappyr   r   )r   r  rX   )r   r)   r   r   N)r+   r#   r5   r   r  r   r   compute)r   rX   r  r   r   r)   r   r   r  r  r   r   rJ   rc   r   outr    r    r!   r     s$    6
r   )NNNNN)NNN)NFFNN)NNNNN)NFFN)N)NNNNNr	  N)r	  r  NFFNNT)AZ
__future__r   r   r   r&   r   r   r   rv   rW   Zdistutils.versionr   Znumpyrb   rm   rO   Zcorer   r	   Zutilsr
   r   r   Zbytes.compressionr   r   r   Zcompatibilityr   r   r   r   r   Zbytes.utilsr   r   r   r   r   __all__r2   r7   rk   r\   rU   r   r   r   rD   r   r   r   r   r   r   r   r  r  r  r  r
  r  r   r   __doc__r    r    r    r!   <module>   sr   NG 
OU 
-*(  
D 
td)
 
 
. 
j  
O