B
    G\>                 @   sb  d dl mZmZmZ d dlmZ d dlZd dlZd dlm	Z	 yd dl
mZmZ W n$ ek
rt   d dlmZmZ Y nX d dlZddlmZ dd	lmZ dd
lmZ ddlmZmZ ddlmZmZmZ i ZejdZejdZdRddZ dd Z!dd Z"dd Z#dd dD Z$dd Z%dd Z&dd Z'i Z(d d! Z)e)d"e e! e)d#e"e# e)d$e%e& e)d%de' dSd'd(Z*dTd)d*Z+G d+d, d,e,Z-e-Z.G d-d. d.e,Z/d/d0 Z0d1d2 Z1dUd4d5Z2d6d7 Z3d8d9 Z4d:d; Z5d<d= Z6d>d? Z7d@dA Z8dBdC Z9ee	j:e/dDdE Z;e:e<e=fdFdG Z>e:e<e=fdHdI Z?dJdK Z@dLdM ZAdNdO ZBe:eCeB dPdQ ZDdS )V    )print_functiondivisionabsolute_import)partialN)normalize_token)valmapget_in   )pickle   )PY2)has_keyword)maybe_compress
decompress)unpack_framespack_frames_preludeframe_split_sizedask_serializedask_deserializec             C   s   t t| }ytt| }W n tk
r:   t|Y nX t|drX|| |d\}}n|| \}}||d< tt| |d< d|d< ||fS )z/Serialise object using the class-based registrycontext)r   typeztype-serializeddask
serializer)	typenamer   r   dispatch	TypeErrorNotImplementedErrorr   r
   dumps)xr   Z	type_namer   headerframes r!   =lib/python3.7/site-packages/distributed/protocol/serialize.py
dask_dumps   s    
r#   c             C   s"   t | d }t|}|| |S )Nztype-serialized)r
   loadsr   r   )r   r    typr$   r!   r!   r"   
dask_loads.   s    
r&   c             C   s   ddit | gfS )Nr   r
   )r
   r   )r   r!   r!   r"   pickle_dumps4   s    r'   c             C   s   t d|S )N    )r
   r$   join)r   r    r!   r!   r"   pickle_loads8   s    r*   c             C   s   i | ]}d d| qS )iz
max_%s_lenr!   ).0r   r!   r!   r"   
<dictcomp><   s   r,   )strbinZarraymapZextc             C   s@   yt j| dd}W n tk
r,   t Y nX ddi|gfS d S )NT)use_bin_typer   msgpack)r1   r   	Exceptionr   )r   framer!   r!   r"   msgpack_dumpsA   s
    r4   c             C   s   t jd|fdddtS )Nr(   utf8F)encodinguse_list)r1   r$   r)   msgpack_len_opts)r   r    r!   r!   r"   msgpack_loadsJ   s    r9   c             C   s    d dd |D }t|d S )N
c             S   s   g | ]}| d qS )r5   )decode)r+   r3   r!   r!   r"   
<listcomp>P   s    z-serialization_error_loads.<locals>.<listcomp>)r)   r   )r   r    msgr!   r!   r"   serialization_error_loadsO   s    r>   c             C   s   |||ot |dft| < d S )Nr   )r   families)namer   r$   r!   r!   r"   register_serialization_familyW   s    rA   r   r
   r1   errormessagec             C   s"  |dkrd}t | tr"| j| jfS d}x|D ]}t| \}}}y,|rP|| |dn|| \}	}
||	d< |	|
fS  tk
r   w,Y q, tk
r } zt }P W dd}~X Y q,X q,W dt	| j
 }|dk r|g}
|r|
|dd  d	d
 |
D }
ddi|
fS |dkrt|t| dd dS )a  
    Convert object to a header and list of bytestrings

    This takes in an arbitrary Python object and returns a msgpack serializable
    header and a list of bytes or memoryview objects.

    The serialization protocols to use are configurable: a list of names
    define the set of serializers to use, in order. These names are keys in
    the ``serializer_registry`` dict (e.g., 'pickle', 'msgpack'), which maps
    to the de/serialize functions. The name 'dask' is special, and will use the
    per-class serialization methods. ``None`` gives the default list
    ``['dask', 'pickle']``.

    Examples
    --------
    >>> serialize(1)
    ({}, [b'\x80\x04\x95\x03\x00\x00\x00\x00\x00\x00\x00K\x01.'])

    >>> serialize(b'123')  # some special types get custom treatment
    ({'type': 'builtins.bytes'}, [b'123'])

    >>> deserialize(*serialize(1))
    1

    Returns
    -------
    header: dictionary containing any msgpack-serializable metadata
    frames: list of bytes or memoryviews, commonly of length one

    See Also
    --------
    deserialize: Convert header and frames back to object
    to_serialize: Mark that data in a message should be serialized
    register_serialization: Register custom serialization functions
    N)r   r
    )r   r   z&Could not serialize object of type %s.rC   i c             S   s   g | ]}|  qS r!   )encode)r+   r3   r!   r!   r"   r<      s    zserialize.<locals>.<listcomp>rB   raisei'  )
isinstance
Serializedr   r    r?   r   r2   	traceback
format_excr   __name__appendr   r-   )r   ZserializersZon_errorr   tbr@   r   r$   wants_contextr   r    er=   r!   r!   r"   	serializea   s2    $



rP   c             C   sJ   |  d}|dk	r2||kr2td|tt|f t| \}}}|| |S )a  
    Convert serialized header and list of bytestrings back to a Python object

    Parameters
    ----------
    header: dict
    frames: list of bytes
    deserializers : Optional[Dict[str, Tuple[Callable, Callable, bool]]]
        An optional dict mapping a name to a (de)serializer.
        See `dask_serialize` and `dask_deserialize` for more.

    See Also
    --------
    serialize
    r   NzAData serialized with %s but only able to deserialize data with %s)getr   r-   listr?   )r   r    Zdeserializersr@   r   r$   rN   r!   r!   r"   deserialize   s    
rS   c               @   s8   e Zd ZdZdd Zdd Zdd Zdd	 Zd
d ZdS )	Serializea   Mark an object that should be serialized

    Example
    -------
    >>> msg = {'op': 'update', 'data': to_serialize(123)}
    >>> msg  # doctest: +SKIP
    {'op': 'update', 'data': <Serialize: 123>}

    See also
    --------
    distributed.protocol.dumps
    c             C   s
   || _ d S )N)data)selfrU   r!   r!   r"   __init__   s    zSerialize.__init__c             C   s   dt | j S )Nz<Serialize: %s>)r-   rU   )rV   r!   r!   r"   __repr__   s    zSerialize.__repr__c             C   s   t |to|j| jkS )N)rG   rT   rU   )rV   otherr!   r!   r"   __eq__   s    
zSerialize.__eq__c             C   s
   | |k S )Nr!   )rV   rY   r!   r!   r"   __ne__   s    zSerialize.__ne__c             C   s
   t | jS )N)hashrU   )rV   r!   r!   r"   __hash__   s    zSerialize.__hash__N)	rK   
__module____qualname____doc__rW   rX   rZ   r[   r]   r!   r!   r!   r"   rT      s   rT   c               @   s0   e Zd ZdZdd Zdd Zdd Zdd	 Zd
S )rH   a
  
    An object that is already serialized into header and frames

    Normal serialization operations pass these objects through.  This is
    typically used within the scheduler which accepts messages that contain
    data without actually unpacking that data.
    c             C   s   || _ || _d S )N)r   r    )rV   r   r    r!   r!   r"   rW      s    zSerialized.__init__c             C   s&   ddl m} || j| j}t| j|S )Nr	   )r   )Zcorer   r   r    rS   )rV   r   r    r!   r!   r"   rS      s    zSerialized.deserializec             C   s"   t |to |j| jko |j| jkS )N)rG   rH   r   r    )rV   rY   r!   r!   r"   rZ      s    
zSerialized.__eq__c             C   s
   | |k S )Nr!   )rV   rY   r!   r!   r"   r[      s    zSerialized.__ne__N)rK   r^   r_   r`   rW   rS   rZ   r[   r!   r!   r!   r"   rH      s
   rH   c             C   s4   t | }|tkrttt| S |tkr0tt| S | S )N)r   rR   r/   container_copydictr   )cr%   r!   r!   r"   ra      s    
ra   c             C   s   i }t | | |r\t| } x@|D ]8}t|dd | }t|trL||d = q d||d < q W t }x:| D ].\}}t|tt	fkrlt
|||< || qlW | ||fS )ak   Pull out Serialize objects from message

    This also remove large bytestrings from the message into a second
    dictionary.

    Examples
    --------
    >>> from distributed.protocol import to_serialize
    >>> msg = {'op': 'update', 'data': to_serialize(123)}
    >>> extract_serialize(msg)
    ({'op': 'update'}, {('data',): <Serialize: 123>}, set())
    N)_extract_serializera   r   rG   rb   setitemsr   bytes	bytearrayto_serializeadd)r   serpathtZbytestringskvr!   r!   r"   extract_serialize  s    


rq   r!   c             C   s  t | tkrxv|  D ]j\}}t |}|tks6|tkrJt||||f  q|tksr|tksr|ttfkrt	|dkr||||f < qW nt | tkrxvt
| D ]j\}}t |}|tks|tkrt||||f  q|tks|tks|ttfkrt	|dkr||||f < qW d S )Ni   )r   rb   rg   rR   re   rT   rH   rh   ri   len	enumerate)r   rl   rm   ro   rp   r%   r!   r!   r"   re   $  s     re   c                s    fdd  | S )z
    Replace all Serialize and Serialized values nested in *x*
    with the original values.  Returns a copy of *x*.

    >>> msg = {'op': 'update', 'data': to_serialize(123)}
    >>> nested_deserialize(msg)
    {'op': 'update', 'data': 123}
    c                s  t | tkr|  } x|  D ]\\}}t |}|tks>|tkrL || |< q|tkr`|j| |< q|tkrt|j	|j
| |< qW n~t | tkrt| } xht| D ]\\}}t |}|tks|tkr̈ || |< q|tkr|j| |< q|tkrt|j	|j
| |< qW | S )N)r   rb   copyrg   rR   rT   rU   rH   rS   r   r    rs   )r   ro   rp   r%   )replace_innerr!   r"   ru   @  s*    z)nested_deserialize.<locals>.replace_innerr!   )r   r!   )ru   r"   nested_deserialize7  s    	rv   c             K   sr   t | f|\}}t|}|r0ttt| \}}ng }||d< t||d< tj|dd}|gt| }t	|g| S )NcompressioncountT)r0   )
rP   r   zipr/   r   rr   r1   r   rR   r   )r   kwargsr   r    rw   Zframes2r!   r!   r"   serialize_bytelist\  s    r{   c             K   s(   t | f|}trdd |D }d|S )Nc             S   s   g | ]}t |qS r!   )rh   )r+   yr!   r!   r"   r<   n  s    z#serialize_bytes.<locals>.<listcomp>r(   )r{   r   r)   )r   rz   Lr!   r!   r"   serialize_bytesk  s    r~   c             C   sL   t | }|d |dd   }}|r4tj|ddd}ni }t||}t||S )Nr   r	   F)rawr7   )r   r1   r$   r   rS   )br    r   r!   r!   r"   deserialize_bytesr  s    
r   c             C   s2   t | trtdt| | t| | dS )a   Register a new class for dask-custom serialization

    Parameters
    ----------
    cls: type
    serialize: callable(cls) -> Tuple[Dict, List[bytes]]
    deserialize: callable(header: Dict, frames: List[bytes]) -> cls

    Examples
    --------
    >>> class Human(object):
    ...     def __init__(self, name):
    ...         self.name = name

    >>> def serialize(human):
    ...     header = {}
    ...     frames = [human.name.encode()]
    ...     return header, frames

    >>> def deserialize(header, frames):
    ...     return Human(frames[0].decode())

    >>> register_serialization(Human, serialize, deserialize)
    >>> serialize(Human('Alice'))
    ({}, [b'Alice'])

    See Also
    --------
    serialize
    deserialize
    z^Strings are no longer accepted for type registration. Use dask_serialize.register_lazy insteadN)rG   r-   r   r   registerr   )clsrP   rS   r!   r!   r"   register_serialization  s
     
r   c             C   s   t ddS )z[Register a registration function to be called if *toplevel*
    module is ever loaded.
    z9Serialization registration has changed. See documentationN)r2   )Ztoplevelfuncr!   r!   r"   register_serialization_lazy  s    r   c             C   s   | j d | j S )z Return name of type

    Examples
    --------
    >>> from distributed import Scheduler
    >>> typename(Scheduler)
    'distributed.scheduler.Scheduler'
    .)r^   rK   )r%   r!   r!   r"   r     s    	r   c             C   s   | j g| j S )N)r   r    )or!   r!   r"   normalize_Serialized  s    r   c             C   s   i }| g}||fS )Nr!   )objr   r    r!   r!   r"   _serialize_bytes  s    r   c             C   s   |d S )Nr   r!   )r   r    r!   r!   r"   _deserialize_bytes  s    r   c                sr   t |   tkpp tkpp tkppt| trVttt| 	 rVt fdd| 
 D ppt| ttfopttt| S )Nc             3   s   | ]} t kV  qd S )N)r-   )r+   r   )r%   r!   r"   	<genexpr>  s    z+_is_msgpack_serializable.<locals>.<genexpr>)r   r-   intfloatrG   rb   allr/   _is_msgpack_serializablevalueskeysrR   tuple)rp   r!   )r%   r"   r     s
    r   c             C   s   dt t| i i d}g }t| tr,| }n| j}xz| D ]n\}}t|rZ||d |< q<t|trrt|\}}nt	|\}}|t
|t
|t
| d|d |< ||7 }q<W ||fS )Nr   )r   ztype-serializedsimplecomplexr   )r   startstopr   )r
   r   r   rG   rb   __dict__rg   r   serialize_object_with_dictrP   rr   )Zestr   r    dro   rp   hfr!   r!   r"   r     s&    

r   c       
      C   s   t | d }t|tr"i  }}nt|}|j}|| d  xF| d  D ]6\}}|d }||d |d  }t	||}	|	||< qNW |S )Nztype-serializedr   r   r   r   r   )
r
   r$   
issubclassrb   object__new__r   updaterg   rS   )
r   r    r   Zddr   ro   r   r   r   rp   r!   r!   r"   deserialize_object_with_dict  s    



r   c             C   s    t | t t| t dS )a   Register dask_(de)serialize to traverse through __dict__

    Normally when registering new classes for Dask's custom serialization you
    need to manage headers and frames, which can be tedious.  If all you want
    to do is traverse through your object and apply serialize to all of your
    object's attributes then this function may provide an easier path.

    This registers a class for the custom Dask serialization family.  It
    serializes it by traversing through its __dict__ of attributes and applying
    ``serialize`` and ``deserialize`` recursively.  It collects a set of frames
    and keeps small attributes in the header.  Deserialization reverses this
    process.

    This is a good idea if the following hold:

    1.  Most of the bytes of your object are composed of data types that Dask's
        custom serializtion already handles well, like Numpy arrays.
    2.  Your object doesn't require any special constructor logic, other than
        object.__new__(cls)

    Examples
    --------
    >>> import sklearn.base
    >>> from distributed.protocol import register_generic
    >>> register_generic(sklearn.base.BaseEstimator)

    See Also
    --------
    dask_serialize
    dask_deserialize
    N)r   r   r   r   r   )r   r!   r!   r"   register_generic  s     r   )N)NrC   N)N)r!   )EZ
__future__r   r   r   	functoolsr   rI   r   Z	dask.baser   Zcytoolzr   r   ImportErrorZtoolzr1   rD   r
   Zcompatibilityr   Zutilsr   rw   r   r   r   r   r   Zlazy_registrationsZDispatchr   r   r#   r&   r'   r*   r8   r4   r9   r>   r?   rA   rP   rS   r   rT   rj   rH   ra   rq   re   rv   r{   r~   r   r   r   r   r   r   rh   ri   r   r   r   r   r   rb   r   r!   r!   r!   r"   <module>   sn   
	
E
	 
%)
