B
    T\Hg              	   @   s4  d dl mZmZmZ d dlmZ d dlmZmZm	Z	 yd dl
Z
W n ek
rX   dZ
Y nX d dlZd dlZyd dlmZ dZW n ek
r   dZY nX dd	lmZmZ dd
lmZmZ ddlmZmZmZmZ ddlmZ ddl m!Z!m"Z" ddl m#Z#m$Z$ ddlm%Z% e$dkr,d dlm&Z&m'Z'm(Z(m)Z) nd dl*m&Z&m'Z'm(Z(m)Z) d/ddZ+dd Z,d0ddZ-dd Z.e
dk	re , e	de/ e
0 j1Z2e
3 Z4e.e2e4Z5W dQ R X ndZ5e5ddddddddf	dd Z6d!Z7d"d# Z8e8ej9d$d%Z9e8ej:d&d'Z:e8ej;d(d)Z;d*d+ Z<d1d,d-Z=er0dd.l>m?Z? e=j@e?j=_@dS )2    )print_functiondivisionabsolute_import)BytesIO)warncatch_warningssimplefilterN)CategoricalDtypeTF   )
read_bytes
open_files)seekable_filesfiles)PY2PY3Mappingunicode)delayed)
asciitableparse_bytes   )clear_known_categoriesPANDAS_VERSION   )from_delayedz0.20.0)is_integer_dtypeis_float_dtypeis_object_dtypeis_datetime64_any_dtypec	          	   C   s   t  }	|r"|| s"|	| |	| |	d | |	f|}
|rPt|
| |rz|rzt|
jt|krztd|
j|n
|r||
_|r|\}}}|	|}|
j
f |tjtt|
||i}
|
S )aS   Convert a block of bytes to a Pandas DataFrame

    Parameters
    ----------
    reader : callable
        ``pd.read_csv`` or ``pd.read_table``.
    b : bytestring
        The content to be parsed with ``reader``
    header : bytestring
        An optional header to prepend to ``b``
    kwargs : dict
        A dictionary of keyword arguments to be passed to ``reader``
    dtypes : dict
        DTypes to assign to columns
    path : tuple
        A tuple containing path column name, path to file, and all paths.

    See Also
    --------
    dask.dataframe.csv.read_pandas_from_bytes
    r   zColumns do not match)r   
startswithrstripwriteseekcoerce_dtypeslistcolumns
ValueErrorindexassignpdCategorical
from_codesnpZfulllen)readerbheaderkwargsdtypesr%   write_headerenforcepathZbiodfcolnamepathscode r:   4lib/python3.7/site-packages/dask/dataframe/io/csv.pypandas_read_text&   s$    





 r<   c             C   s  g }g }g }x| j D ]}||kr| j| || kr| j| }|| }t|rft|rf||||f qt|rt|r|| qy| | || | |< W q tk
r } z"||||f |||f W dd}~X Y qX qW |rp|rd	dd t
|dd dD }	d|	 }
d	}nd	}
d
}t
|dd d}tdddg|}dd	dd |D  }dj||
||d}nd}|r|rdnd}d	dd |D }dj||d}nd}|s|rdd }d|	td||g }t|dS )z Coerce dataframe to dtypes safely

    Operates in place

    Parameters
    ----------
    df: Pandas DataFrame
    dtypes: dict like {'x': float}
    N
c             s   s   | ]\}}d ||f V  qdS )z	- %s
  %rNr:   ).0cer:   r:   r;   	<genexpr>t   s    z coerce_dtypes.<locals>.<genexpr>c             S   s   t | d S )Nr   )str)xr:   r:   r;   <lambda>u   s    zcoerce_dtypes.<locals>.<lambda>)keyzAThe following columns also raised exceptions on conversion:

%s

 zf

Alternatively, provide `assume_missing=True` to interpret
all unspecified integer columns as floats.c             S   s   t | d S )Nr   )rB   )rC   r:   r:   r;   rD      s    ZColumnZFoundZExpectedz
dtype={%s}z	,
       c             s   s    | ]\}}}d ||f V  qdS )z%r: '%s'Nr:   )r>   kv_r:   r:   r;   rA      s   z{table}

{exceptions}Usually this is due to dask's dtype inference failing, and
*may* be fixed by specifying dtypes manually by adding:

{dtype_kw}

to the call to `read_csv`/`read_table`.{extra})table
exceptionsdtype_kwextraz also  c             s   s   | ]}d | V  qdS )z- %sNr:   )r>   r?   r:   r:   r;   rA      s    a  The following columns{also}failed to properly parse as dates:

{cols}

This is usually due to an invalid value in that column. To
diagnose and fix it's recommended to drop these columns from the
`parse_dates` keyword, and manually convert them to dates later
using `dd.to_datetime`.)alsocolsz

%s

z=-------------------------------------------------------------z=Mismatched dtypes found in `pd.read_csv`/`pd.read_table`.

%s)r%   r2   r   r   appendr   r   astype	Exceptionjoinsortedr   formatfilterr&   )r6   r2   Z
bad_dtypesZ	bad_dateserrorsr?   ZactualZdesiredr@   exrK   rM   rJ   rL   Z	dtype_msgrO   rP   Zdate_msgZrulemsgr:   r:   r;   r#   R   sZ    

$
r#   c	                s  |j  }	|jdgdj}
g }|
}trjt trN fdd|
D }|
|}nt trj j	dkrjg }|
}x|D ]}d|	|< qpW t
|j}ttdd}g }|pd\}}xt|D ]\}}|sq|r||| |f}nd}|| |d	 |||	|d
||d	}|| | }|dd x4|dd D ]$}||| ||||	|||d q W qW |r|r||jf |tjtt||i}t|rt||d}t||S |S dS )a   Convert blocks of bytes to a dask.dataframe or other high-level object

    This accepts a list of lists of values of bytes where each list corresponds
    to one file, and the value of bytes concatenate to comprise the entire
    file, in order.

    Parameters
    ----------
    reader : callable
        ``pd.read_csv`` or ``pd.read_table``.
    block_lists : list of lists of delayed values of bytes
        The lists of bytestrings where each list corresponds to one logical file
    header : bytestring
        The header, found at the front of the first file, to be prepended to
        all blocks
    head : pd.DataFrame
        An example Pandas DataFrame to be used for metadata.
        Can be ``None`` if ``collection==False``
    kwargs : dict
        Keyword arguments to pass down to ``reader``
    collection: boolean, optional (defaults to True)
    path : tuple, optional
        A tuple containing column name for path and list of all paths

    Returns
    -------
    A dask.dataframe or list of delayed values
    category)Zincludec                s0   g | ](}t  |tr |jd k	r|qS )N)
isinstancegetr	   
categories)r>   rG   )specified_dtypesr:   r;   
<listcomp>   s    z)text_blocks_to_pandas.<locals>.<listcomp>NT)pure)NNr   F)r3   r4   r5   skiprowsr   )r4   r5   )rP   )r2   Zto_dictZselect_dtypesr%   _HAS_CDTr\   r   
differencer	   r^   r$   r   r<   	enumeraterQ   copypopr(   r)   r*   r+   r,   Zzerosr-   r   r   )r.   Zblock_listsr0   headr1   
collectionr4   r_   r5   r2   ZcategoricalsZknown_categoricalsZunknown_categoricalsrG   r%   Zdelayed_pandas_read_textdfsr7   r8   iZblocksZ	path_infor6   Zrest_kwargsr/   r:   )r_   r;   text_blocks_to_pandas   sT    









rl   c             C   s"   d}t | | | }t|t dS )N
   g    A)intmin)Ztotal_memory	cpu_countZmemory_factor	blocksizer:   r:   r;   auto_blocksize  s    rr   ignorei   i  c                s  | j }|d k	r$t|dkr$||d< nd}|
r:t|
tr:d}
d|ksJd|krXtd|x$dD ]}||kr^td	||q^W |d
d rtd|t|dtr|d } }}nR|dd krd } }}n6t|d}t	|}t
ttt|d t| }t|dtr.td|t|dtrZ|
rZ|d|
d  nd  t|ttfrvt|}|r|tkrtd|  d }|tkr|tkrtd| |r||k r|dkrtd |}| }t|f|||||
d|	p i }|
r8|\}}} r. fdd|D }|
|f}n|\}}d }t|d ttfs^|g}|dd }|d|d kr~dnd }|d krdnd}|||| }|sdnt|t|d   }||| k rt||krtd|d krdn
|| | }| t|f|}|
r2|
|jkr2td|
 |di }|d krLi }|rt|trx<|jD ]2}t|| jrf||krf|| t||< qfW t | ||||||||d	S ) Nr   lineterminatorr=   r5   r'   Z	index_colzGKeyword 'index' not supported dd.{0}(...).set_index('my-index') instead)iteratorZ	chunksizez{0} not supported for dd.{1}ZnrowszThe 'nrows' keyword is not supported by `dd.{0}`. To achieve the same behavior, it's recommended to use `dd.{0}(...).head(n=nrows)`rb   r   r0   z,List of header rows not supported for dd.{0}Z
converterszWarning %s compression does not support breaking apart files
Please ensure that each individual file can fit in memory and
use the keyword ``blocksize=None to remove this message``
Setting ``blocksize=None``z#Compression format %s not installedz}Unexpected behavior can result from passing skiprows when
blocksize is smaller than sample size.
Setting ``sample=blocksize``)Z	delimiterrq   samplecompressionZinclude_pathc                s   g | ]} |qS r:   r:   )r>   r5   )path_converterr:   r;   r`   ^  s    zread_pandas.<locals>.<listcomp>namesZinferr   zSample is not large enough to include at least one row of data. Please increase the number of bytes in `sample` in the call to `read_csv`/`read_table`    zFiles already contain the column name: %s, so the path column cannot use this name. Please set `include_path_column` to a unique name.dtype)ri   r4   r_   r5   )!__name__r-   r\   boolr&   rV   r]   rn   setmaxro   ranger$   	TypeErrordictrB   r   r   r   r   cfilesNotImplementedErrorencoder   tuplesplitr   r%   r   r|   rR   floatrl   )r.   urlpathrq   ri   rt   rw   rv   r4   assume_missingstorage_optionsinclude_path_columnr1   reader_namekwrb   ZlastskiprowZfirstrowZb_lineterminatorZb_outZb_samplevaluesr8   r5   ry   r0   ZneedpartsZnpartsrh   r_   r?   r:   )rx   r;   read_pandas  s    

 

 
r   aQ  
Read {file_type} files into a Dask.DataFrame

This parallelizes the :func:`pandas.{reader}` function in the following ways:

- It supports loading many files at once using globstrings:

    >>> df = dd.{reader}('myfiles.*.csv')  # doctest: +SKIP

- In some cases it can break up large files:

    >>> df = dd.{reader}('largefile.csv', blocksize=25e6)  # 25MB chunks  # doctest: +SKIP

- It can read CSV files from external resources (e.g. S3, HDFS) by
  providing a URL:

    >>> df = dd.{reader}('s3://bucket/myfiles.*.csv')  # doctest: +SKIP
    >>> df = dd.{reader}('hdfs:///myfiles.*.csv')  # doctest: +SKIP
    >>> df = dd.{reader}('hdfs://namenode.example.com/myfiles.*.csv')  # doctest: +SKIP

Internally ``dd.{reader}`` uses :func:`pandas.{reader}` and supports many of the
same keyword arguments with the same performance guarantees. See the docstring
for :func:`pandas.{reader}` for more information on available keyword arguments.

Parameters
----------
urlpath : string or list
    Absolute or relative filepath(s). Prefix with a protocol like ``s3://``
    to read from alternative filesystems. To read from multiple files you
    can pass a globstring or a list of paths, with the caveat that they
    must all have the same protocol.
blocksize : str, int or None, optional
    Number of bytes by which to cut up larger files. Default value is
    computed based on available physical memory and the number of cores.
    If ``None``, use a single block for each file.
    Can be a number like 64000000 or a string like "64MB"
collection : boolean, optional
    Return a dask.dataframe if True or list of dask.delayed objects if False
sample : int, optional
    Number of bytes to use when determining dtypes
assume_missing : bool, optional
    If True, all integer columns that aren't specified in ``dtype`` are assumed
    to contain missing values, and are converted to floats. Default is False.
storage_options : dict, optional
    Extra options that make sense for a particular storage connection, e.g.
    host, port, username, password, etc.
include_path_column : bool or str, optional
    Whether or not to include the path to each particular file. If True a new
    column is added to the dataframe called ``path``. If str, sets new column
    name. Default is False.
**kwargs
    Extra keyword arguments to forward to :func:`pandas.{reader}`.

Notes
-----
Dask dataframe tries to infer the ``dtype`` of each column by reading a sample
from the start of the file (or of the first file if it's a glob). Usually this
works fine, but if the ``dtype`` is different later in the file (or in other
files) this can cause issues. For example, if all the rows in the sample had
integer dtypes, but later on there was a ``NaN``, then this would error at
compute time. To fix this, you have a few options:

- Provide explicit dtypes for the offending columns using the ``dtype``
  keyword. This is the recommended solution.

- Use the ``assume_missing`` keyword to assume that all columns inferred as
  integers contain missing values, and convert them to floats.

- Increase the size of the sample using the ``sample`` keyword.

It should also be noted that this function may fail if a {file_type} file
includes quoted strings that contain the line terminator. To get around this
you can specify ``blocksize=None`` to not split files into multiple partitions,
at the cost of reduced parallelism.
c          	      s:   t dd d dddd df	 fdd	}tj||d|_||_|S )NTi  Fc
                s&   t  | f|||||||||	d	|
S )N)	rq   ri   rt   rw   rv   r4   r   r   r   )r   )r   rq   ri   rt   rw   rv   r4   r   r   r   r1   )r.   r:   r;   read  s    
zmake_reader.<locals>.read)r.   	file_type)AUTO_BLOCKSIZEREAD_DOC_TEMPLATErV   __doc__r}   )r.   r   r   r   r:   )r.   r;   make_reader  s    

r   read_csvZCSV
read_tableZ	delimitedread_fwfzfixed-widthc          	   K   s"   |}| j |f| W d Q R X d S )N)to_csv)r6   Zfilr1   fr:   r:   r;   
_write_csv  s    r   c                s   t rd}	d}
nd}	d}
 d|	}t|f||
||| jd|p>i }ttdd|  }|d	 |d	 f g}t|d
kr|rd d< | fddt	|d
d |d
d D  |rt|j
|d dd |D S |S dS )a  
    Store Dask DataFrame to CSV files

    One filename per partition will be created. You can specify the
    filenames in a variety of ways.

    Use a globstring::

    >>> df.to_csv('/path/to/data/export-*.csv')  # doctest: +SKIP

    The * will be replaced by the increasing sequence 0, 1, 2, ...

    ::

        /path/to/data/export-0.csv
        /path/to/data/export-1.csv

    Use a globstring and a ``name_function=`` keyword argument.  The
    name_function function should expect an integer and produce a string.
    Strings produced by name_function must preserve the order of their
    respective partition indices.

    >>> from datetime import date, timedelta
    >>> def name(i):
    ...     return str(date(2015, 1, 1) + i * timedelta(days=1))

    >>> name(0)
    '2015-01-01'
    >>> name(15)
    '2015-01-16'

    >>> df.to_csv('/path/to/data/export-*.csv', name_function=name)  # doctest: +SKIP

    ::

        /path/to/data/export-2015-01-01.csv
        /path/to/data/export-2015-01-02.csv
        ...

    You can also provide an explicit list of paths::

    >>> paths = ['/path/to/data/alice.csv', '/path/to/data/bob.csv', ...]  # doctest: +SKIP
    >>> df.to_csv(paths) # doctest: +SKIP

    Parameters
    ----------
    filename : string
        Path glob indicating the naming scheme for the output files
    name_function : callable, default None
        Function accepting an integer (partition index) and producing a
        string to replace the asterisk in the given filename globstring.
        Should preserve the lexicographic order of partitions
    compression : string or None
        String like 'gzip' or 'xz'.  Must support efficient random access.
        Filenames with extensions corresponding to known compression
        algorithms (gz, bz2) will be compressed accordingly automatically
    sep : character, default ','
        Field delimiter for the output file
    na_rep : string, default ''
        Missing data representation
    float_format : string, default None
        Format string for floating point numbers
    columns : sequence, optional
        Columns to write
    header : boolean or list of string, default True
        Write out column names. If a list of string is given it is assumed
        to be aliases for the column names
    header_first_partition_only : boolean, default False
        If set, only write the header row in the first output file
    index : boolean, default True
        Write row names (index)
    index_label : string or sequence, or False, default None
        Column label for index column(s) if desired. If None is given, and
        `header` and `index` are True, then the index names are used. A
        sequence should be given if the DataFrame uses MultiIndex.  If
        False do not print fields for index names. Use index_label=False
        for easier importing in R
    nanRep : None
        deprecated, use na_rep
    mode : str
        Python write mode, default 'w'
    encoding : string, optional
        A string representing the encoding to use in the output file,
        defaults to 'ascii' on Python 2 and 'utf-8' on Python 3.
    compression : string, optional
        a string representing the compression to use in the output file,
        allowed values are 'gzip', 'bz2', 'xz',
        only used when the first argument is a filename
    line_terminator : string, default '\n'
        The newline character or character sequence to use in the output
        file
    quoting : optional constant from csv module
        defaults to csv.QUOTE_MINIMAL
    quotechar : string (length 1), default '"'
        character used to quote fields
    doublequote : boolean, default True
        Control quoting of `quotechar` inside a field
    escapechar : string (length 1), default None
        character used to escape `sep` and `quotechar` when appropriate
    chunksize : int or None
        rows to write at a time
    tupleize_cols : boolean, default False
        write multi_index columns as a list of tuples (if True)
        or new (expanded format) if False)
    date_format : string, default None
        Format string for datetime objects
    decimal: string, default '.'
        Character recognized as decimal separator. E.g. use ',' for
        European data
    storage_options: dict
        Parameters passed on to the backend filesystem class.

    Returns
    -------
    The names of the file written if they were computed right away
    If not, the delayed tasks associated to the writing of the files
    Nwbzutf-8Zwtencoding)rw   moder   name_functionZnumF)ra   r   r   r0   c                s   g | ]\}}||f qS r:   r:   )r>   dr   )r1   to_csv_chunkr:   r;   r`     s   zto_csv.<locals>.<listcomp>)	schedulerc             S   s   g | ]
}|j qS r:   )r5   )r>   r   r:   r:   r;   r`     s    )r   r]   r   Znpartitionsr   r   Z
to_delayedr-   extendzipcompute)r6   filenamer   rw   r   r   r   Zheader_first_partition_onlyr1   Zdefault_encodingr   r   r   rj   r   r:   )r1   r   r;   r     s*    x
 r   )_Frame)NNTFN)TFNN)NNTNNF)AZ
__future__r   r   r   ior   warningsr   r   r   ZpsutilImportErrorZnumpyr,   Zpandasr)   Zpandas.api.typesr	   rc   bytesr   r   Zbytes.compressionr   r   r   Zcompatibilityr   r   r   r   r   Zutilsr   r   r   r   r   r   r   r   r   Zpandas.types.commonr<   r#   rl   rr   RuntimeWarningZvirtual_memoryZtotalZ	TOTAL_MEMrp   Z	CPU_COUNTr   r   r   r   r   r   r   r   r   Zcorer   r   r:   r:   r:   r;   <module>   sl   


 
+U 
]


 B  
 