B
    F.\                 @   s   d dl mZmZmZ d dlmZ d dlZd dlZd dlm	Z	 d dl
Zyd dlmZ W n  ek
rt   d dlmZ Y nX ddlmZmZmZ ddlmZ dd	lmZmZ dd
lmZ eeZG dd deZG dd deZdS )    )print_functiondivisionabsolute_import)defaultdictN)gen)merge   )Future_get_global_clientClient)time)tokey
log_errors)
get_clientc               @   sZ   e Zd ZdZdd ZdddZejdd Zdd	d
Z	ejdddZ
ejdddZdS )VariableExtensionz An extension for the scheduler to manage queues

    This adds the following routes to the scheduler

    *  variable-set
    *  variable-get
    *  variable-delete
    c             C   sv   || _ t | _tt| _ttjj| _	tj | _
| j j| j| jd | j| j jd< | j| j jd< | | j jd< d S )N)variable_setvariable_getzvariable-future-releasevariable_delete	variables)	schedulerdictr   r   setwaitingtornadoZlocksZ	Conditionwaiting_conditionsstartedZhandlersupdategetfuture_releaseZstream_handlersdelete
extensions)selfr    r"   3lib/python3.7/site-packages/distributed/variable.py__init__!   s    
zVariableExtension.__init__Nc             C   s   |d k	r*d|d}| j j|gd| d n
d|d}y| j| }W n tk
rV   Y n*X |d dkr|d |kr| |d | || jkr| j  || j|< d S )Nr	   )typevaluezvariable-%s)keysclientZmsgpackr%   r&   )r   Zclient_desires_keysr   KeyErrorreleaser   
notify_all)r!   streamnamekeydatar(   recordoldr"   r"   r#   r   0   s    



zVariableExtension.setc             c   sJ   x"| j ||f r"| j|  V  qW | jj|gd| d | j ||f= d S )Nzvariable-%s)r'   r(   )r   r   waitr   Zclient_releases_keys)r!   r.   r-   r"   r"   r#   r*   A   s
    
zVariableExtension.releasec             C   s4   | j ||f | | j ||f s0| j|   d S )N)r   remover   r+   )r!   r-   r.   tokenr(   r"   r"   r#   r   J   s    z VariableExtension.future_releasec             c   s   t  }xN|| jkrT|d k	r*|t  |  }nd }|rB|dk rBt | jj|dV  qW | j| }|d dkr|d }t j}	| j	j
|}
|
d k	r|
jnd}|	|d}|dkr|
jj|d	< |
jj|d
< t||}| j||f |	 t|d S )Nr   )timeoutr%   r	   r&   Zlost)r4   stateerred	exception	traceback)r   r   r   TimeoutErrorr   r2   uuiduuid4hexr   Ztasksr   r6   Zexception_blamer8   r9   r   r   addReturn)r!   r,   r-   r(   r5   startleftr0   r.   r4   Ztsr6   msgr"   r"   r#   r   O   s*    



zVariableExtension.getc          	   c   sh   t  X y| j| }W n tk
r*   Y n X |d dkrJ| |d |V  | j|= | j|= W d Q R X d S )Nr%   r	   r&   )r   r   r)   r*   r   )r!   r,   r-   r(   r1   r"   r"   r#   r   h   s    zVariableExtension.delete)NNNNN)NNNN)NNNN)NNN)__name__
__module____qualname____doc__r$   r   r   	coroutiner*   r   r   r   r"   r"   r"   r#   r      s   
	
r   c               @   sb   e Zd ZdZdddZejdd Zdd	 Zejdd
dZ	dddZ
dd Zdd Zdd ZdS )Variablea.   Distributed Global Variable

    This allows multiple clients to share futures and data between each other
    with a single mutable variable.  All metadata is sequentialized through the
    scheduler.  Race conditions can occur.

    Values must be either Futures or msgpack-encodable data (ints, lists,
    strings, etc..)  All data will be kept and sent through the scheduler, so
    it is wise not to send too much.  If you want to share a large amount of
    data then ``scatter`` it and share the future instead.

    .. warning::

       This object is experimental and has known issues in Python 2

    Examples
    --------
    >>> from dask.distributed import Client, Variable # doctest: +SKIP
    >>> client = Client()  # doctest: +SKIP
    >>> x = Variable('x')  # doctest: +SKIP
    >>> x.set(123)  # docttest: +SKIP
    >>> x.get()  # docttest: +SKIP
    123
    >>> future = client.submit(f, x)  # doctest: +SKIP
    >>> x.set(future)  # doctest: +SKIP

    See Also
    --------
    Queue: shared multi-producer/multi-consumer queue between clients
    Nr   c             C   s$   |pt  | _|pdt j | _d S )Nz	variable-)r
   r(   r;   r<   r=   r-   )r!   r-   r(   maxsizer"   r"   r#   r$      s    zVariable.__init__c             c   sB   t |tr(| jjjt|j| jdV  n| jjj|| jdV  d S )N)r.   r-   )r/   r-   )
isinstancer	   r(   r   r   r   r.   r-   )r!   r&   r"   r"   r#   _set   s
    

zVariable._setc             K   s   | j j| j|f|S )z Set the value of this variable

        Parameters
        ----------
        value: Future or object
            Must be either a Future or a msgpack-encodable value
        )r(   syncrK   )r!   r&   kwargsr"   r"   r#   r      s    zVariable.setc             c   s   | j jj|| j| j jdV }|d dkrt|d | j d|d d}|d dkrd|j|d	 |d
  | j d| j|d |d d n|d }t	
|d S )N)r5   r-   r(   r%   r	   r&   Tr6   )Zinformr6   r7   r8   r9   zvariable-future-releaser4   )opr-   r.   r4   )r(   r   r   r-   idr	   Z_stateZ	set_error_send_to_schedulerr   r?   )r!   r5   dr&   r"   r"   r#   _get   s    
zVariable._getc             K   s   | j j| jfd|i|S )z  Get the value of this variable r5   )r(   rL   rR   )r!   r5   rM   r"   r"   r#   r      s    zVariable.getc             C   s$   | j jdkr | j d| jd dS )zn Delete this variable

        Caution, this affects all clients currently pointing to this variable.
        Zrunningr   )rN   r-   N)r(   ZstatusrP   r-   )r!   r"   r"   r#   r      s    zVariable.deletec             C   s   | j | jjjfS )N)r-   r(   r   address)r!   r"   r"   r#   __getstate__   s    zVariable.__getstate__c          	   C   s\   |\}}yt |}|jj|ks"tW n$ ttfk
rH   t|dd}Y nX | j||d d S )NF)Zset_as_default)r-   r(   )r   r   rS   AssertionErrorAttributeErrorr   r$   )r!   r6   r-   rS   r(   r"   r"   r#   __setstate__   s    zVariable.__setstate__)NNr   )N)N)rC   rD   rE   rF   r$   r   rG   rK   r   rR   r   r   rT   rW   r"   r"   r"   r#   rH   v   s   
	

	rH   ) Z
__future__r   r   r   collectionsr   Zloggingr;   r   r   Ztornado.locksZcytoolzr   ImportErrorZtoolzr(   r	   r
   r   Zmetricsr   Zutilsr   r   Zworkerr   Z	getLoggerrC   Zloggerobjectr   rH   r"   r"   r"   r#   <module>   s    
_