B
    T\F                 @   s   d dl mZmZmZ d dlZd dlZd dlmZ ddlm	Z	 ddl
mZmZ ddlmZ ddlmZmZ d	d
lmZ eddZddedejdddfddZdd Zdd ZdS )    )print_functiondivisionabsolute_importN)concat   )unicode)system_encodingparse_bytes)delayed)
open_files
read_bytes   )from_delayedT)ZpureZinferstrictc	                s   |dk	r|dk	rt dt|ttfr.t|}t| fd |d|pFi }	|dkr|dkrldd |	D }
qg }
xtdt|	|D ]4}|	|||  }tt	tt
t|}|
| qW n>t| f| |d|d	|pi \}}
 fd
dt	|
D }
|
s
t d| |s|
S t|
S dS )a   Read lines from text files

    Parameters
    ----------
    urlpath : string or list
        Absolute or relative filepath(s). Prefix with a protocol like ``s3://``
        to read from alternative filesystems. To read from multiple files you
        can pass a globstring or a list of paths, with the caveat that they
        must all have the same protocol.
    blocksize: None, int, or str
        Size (in bytes) to cut up larger files.  Streams by default.
        Can be ``None`` for streaming, an integer number of bytes, or a string
        like "128MiB"
    compression: string
        Compression format like 'gzip' or 'xz'.  Defaults to 'infer'
    encoding: string
    errors: string
    linedelimiter: string
    collection: bool, optional
        Return dask.bag if True, or list of delayed values if false
    storage_options: dict
        Extra options that make sense to a particular storage connection, e.g.
        host, port, username, password, etc.
    files_per_partition: None or int
        If set, group input files into partitions of the requested size,
        instead of one partition per file. Mutually exclusive with blocksize.

    Examples
    --------
    >>> b = read_text('myfiles.1.txt')  # doctest: +SKIP
    >>> b = read_text('myfiles.*.txt')  # doctest: +SKIP
    >>> b = read_text('myfiles.*.txt.gz')  # doctest: +SKIP
    >>> b = read_text('s3://bucket/myfiles.*.txt')  # doctest: +SKIP
    >>> b = read_text('s3://key:secret@bucket/myfiles.*.txt')  # doctest: +SKIP
    >>> b = read_text('hdfs://namenode.example.com/myfiles.*.txt')  # doctest: +SKIP

    Parallelize a large file by providing the number of uncompressed bytes to
    load into each partition.

    >>> b = read_text('largefile.txt', blocksize='10MB')  # doctest: +SKIP

    Returns
    -------
    dask.bag.Bag if collection is True or list of Delayed lists otherwise

    See Also
    --------
    from_sequence: Build bag from Python sequence
    Nz7Only one of blocksize or files_per_partition can be setZrt)modeencodingerrorscompressionc             S   s    g | ]}t tt t|qS  )r
   listfile_to_blocks).0Zfilr   r   ,lib/python3.7/site-packages/dask/bag/text.py
<listcomp>P   s    zread_text.<locals>.<listcomp>r   F)Z	delimiter	blocksizeZsampler   c                s   g | ]}t t| qS r   )r
   decode)r   b)r   r   r   r   r   \   s    zNo files found)
ValueError
isinstancestrr   r	   r   rangelenr
   r   mapr   appendr   encoder   )Zurlpathr   r   r   r   ZlinedelimiterZ
collectionZstorage_optionsZfiles_per_partitionfilesZblocksstartZblock_filesZblock_lines_r   )r   r   r   	read_text   s2    5

r(   c          	   c   s(   | }x|D ]
}|V  qW W d Q R X d S )Nr   )Z	lazy_filefliner   r   r   r   g   s    
r   c             C   s   |  ||}t|}t|S )N)r   ioStringIOr   )blockr   r   textlinesr   r   r   r   m   s    
r   )Z
__future__r   r   r   r+   osZtoolzr   Zcompatibilityr   Zutilsr   r	   r
   bytesr   r   Zcorer   linesepr(   r   r   r   r   r   r   <module>   s   
S