B
    T\              	   @   sN   d dl Zd dlZddlmZ ddlmZ ddlm	Z	m
Z
 dd	d
Zdd ZdS )    N   )delayed)string_types   )from_delayedfrom_pandas      c       #         s  ddl }ddl m} ddlm} |dkr0td|dkr<i n|}|j|f|}| }t trt|j	 |d||	d t|tr j
| n|}t|t|jf std| |r|rtd	|rЇ fd
d|D nt j
}||kr|t|tr j
| n| t|tr||d< n
|j|d< |
dkr||| }tj||f|}|jr j} j}	tj|||	|d}t|ddS |jddd d }|dd }
n|dkr|dkrtd|dkr|dkr"||j||j|g }t||}|jd \}}|jd }n|\}}t |j!}|dkr||j"|g }t||d d }t#|| | pd}|j$dkrtj%||d|| & |  d' }||d< ||d< n2|j$dkrt()|||d ' }ntd*|g }|dd |dd  }}x~t+t,||D ]l\}\} }!|t-|d krL||!kn||!k }"||.|/|| k|" }|t0t1|||
f| q&W t2||
|dS )a/  
    Create dataframe from an SQL table.

    If neither divisions or npartitions is given, the memory footprint of the
    first few rows will be determined, and partitions of size ~256MB will
    be used.

    Parameters
    ----------
    table : string or sqlalchemy expression
        Select columns from here.
    uri : string
        Full sqlalchemy URI for the database connection
    index_col : string
        Column which becomes the index, and defines the partitioning. Should
        be a indexed column in the SQL server, and any orderable type. If the
        type is number or time, then partition boundaries can be inferred from
        npartitions or bytes_per_chunk; otherwide must supply explicit
        ``divisions=``.
        ``index_col`` could be a function to return a value, e.g.,
        ``sql.func.abs(sql.column('value')).label('abs(value)')``.
        Labeling columns created by functions or arithmetic operations is
        required.
    divisions: sequence
        Values of the index column to split the table by. If given, this will
        override npartitions and bytes_per_chunk. The divisions are the value
        boundaries of the index column used to define the partitions. For
        example, ``divisions=list('acegikmoqsuwz')`` could be used to partition
        a string column lexographically into 12 partitions, with the implicit
        assumption that each partition contains similar numbers of records.
    npartitions : int
        Number of partitions, if divisions is not given. Will split the values
        of the index column linearly between limits, if given, or the column
        max/min. The index column must be numeric or time for this to work
    limits: 2-tuple or None
        Manually give upper and lower range of values for use with npartitions;
        if None, first fetches max/min from the DB. Upper limit, if
        given, is inclusive.
    columns : list of strings or None
        Which columns to select; if None, gets all; can include sqlalchemy
        functions, e.g.,
        ``sql.func.abs(sql.column('value')).label('abs(value)')``.
        Labeling columns created by functions or arithmetic operations is
        recommended.
    bytes_per_chunk : int
        If both divisions and npartitions is None, this is the target size of
        each partition, in bytes
    head_rows : int
        How many rows to load for inferring the data-types, unless passing meta
    meta : empty DataFrame or None
        If provided, do not attempt to infer dtypes, but use these, coercing
        all chunks on load
    schema : str or None
        If using a table name, pass this to sqlalchemy to select which DB
        schema to use within the URI connection
    engine_kwargs : dict or None
        Specific db engine parameters for sqlalchemy
    kwargs : dict
        Additional parameters to pass to `pd.read_sql()`

    Returns
    -------
    dask.dataframe

    Examples
    --------
    >>> df = dd.read_sql_table('accounts', 'sqlite:///path/to/bank.db',
    ...                  npartitions=10, index_col='id')  # doctest: +SKIP
    r   N)sql)elementsz)Must specify index column to partition onT)ZautoloadZautoload_withschemaz?Use label when passing an SQLAlchemy instance as the index (%s)z5Must supply either divisions or npartitions, not bothc                s$   g | ]}t |tr j| n|qS  )
isinstancer   columns).0c)tabler   4lib/python3.7/site-packages/dask/dataframe/io/sql.py
<listcomp>e   s   z"read_sql_table.<locals>.<listcomp>	index_col)r   r   r   )npartitions)Zdeepindexr	   z>Must provide divisions or npartitions whenusing explicit meta.Zmax_1Zcount_1Mz%iS)startendZfreq)iufzwProvided index column is of type "{}".  If divisions is not provided the index column type must be numeric or datetime.)	divisions)3Z
sqlalchemyr
   Zsqlalchemy.sqlr   
ValueErrorZcreate_engineZMetaDatar   r   ZTabler   ZLabel	TypeErrorlistappendnameZselectlimitZselect_frompdread_sqlemptyr   read_sql_tabler   Zmemory_usagesumfuncmaxminZilocdtypesZSeriesdtypecountroundZkindZ
date_rangeZtotal_secondstolistnpZlinspaceformat	enumerateziplenwhereand_r   _read_sql_chunkr   )#r   urir   r   r   Zlimitsr   Zbytes_per_chunkZ	head_rowsr   metaZengine_kwargskwargsZsar
   r   Zenginemr   qheadr$   Zbytes_per_rowZminmaxZmaxiZminir/   r0   partsZlowersZuppersr   lowerupperZcondr   )r   r   r)   	   s    H
 







" r)   c             K   s2   t j| |f|}|jr|S |j|j ddS d S )NF)copy)r&   r'   r(   Zastyper.   Zto_dict)r?   r;   r<   r=   Zdfr   r   r   r:      s    r:   )	NNNNr   r	   NNN)Znumpyr3   Zpandasr&    r   Zcompatibilityr   ior   r   r)   r:   r   r   r   r   <module>   s     
 #