ó
ßüÚ\c           @` s)  d  d l  m Z m Z m Z d  d l m Z d  d l Z d  d l Z d  d l m	 Z	 d  d l
 Z y d  d l m Z Wn! e k
 r— d  d l m Z n Xd d l m Z m Z m Z d d l m Z d d	 l m Z m Z d d
 l m Z e j e ƒ Z d e f d „  ƒ  YZ d e f d „  ƒ  YZ d S(   i    (   t   print_functiont   divisiont   absolute_import(   t   defaultdictN(   t   gen(   t   mergei   (   t   Futuret   _get_global_clientt   Client(   t   time(   t   tokeyt
   log_errors(   t
   get_clientt   VariableExtensionc           B` s   e  Z d  Z d „  Z d d d d d d „ Z e j d „  ƒ Z d d d d d „ Z	 e j d d d d d „ ƒ Z
 e j d d d d „ ƒ Z RS(   sª    An extension for the scheduler to manage queues

    This adds the following routes to the scheduler

    *  variable-set
    *  variable-get
    *  variable-delete
    c         C` s¬   | |  _  t ƒ  |  _ t t ƒ |  _ t t j j ƒ |  _	 t j j ƒ  |  _
 |  j  j j i |  j d 6|  j d 6ƒ |  j |  j  j d <|  j |  j  j d <|  |  j  j d <d  S(   Nt   variable_sett   variable_gets   variable-future-releaset   variable_deletet	   variables(   t	   schedulert   dictR   R   t   sett   waitingt   tornadot   lockst	   Conditiont   waiting_conditionst   startedt   handlerst   updatet   gett   future_releaset   stream_handlerst   deletet
   extensions(   t   selfR   (    (    s3   lib/python2.7/site-packages/distributed/variable.pyt   __init__!   s    	c         C` sã   | d  k	 rC i d d 6| d 6} |  j j d | g d d | ƒ n i d d 6| d 6} y |  j | } Wn t k
 r{ n8 X| d d k r³ | d | k r³ |  j | d | ƒ n  | |  j k rÒ |  j j ƒ  n  | |  j | <d  S(   NR   t   typet   valuet   keyst   clients   variable-%st   msgpack(   t   NoneR   t   client_desires_keysR   t   KeyErrort   releaseR   t
   notify_all(   R"   t   streamt   namet   keyt   dataR'   t   recordt   old(    (    s3   lib/python2.7/site-packages/distributed/variable.pyR   1   s    # c         c` s`   x) |  j  | | f r+ |  j | j ƒ  Vq W|  j j d | g d d | ƒ |  j  | | f =d  S(   NR&   R'   s   variable-%s(   R   R   t   waitR   t   client_releases_keys(   R"   R0   R/   (    (    s3   lib/python2.7/site-packages/distributed/variable.pyR,   B   s     c         C` sE   |  j  | | f j | ƒ |  j  | | f sA |  j | j ƒ  n  d  S(   N(   R   t   removeR   R-   (   R"   R/   R0   t   tokenR'   (    (    s3   lib/python2.7/site-packages/distributed/variable.pyR   J   s    c         c` sb  t  ƒ  } xn | |  j k ry | d  k	 r; | t  ƒ  | } n d  } | rb | d k  rb t j ƒ  ‚ n  |  j j d | ƒ Vq W|  j | } | d d k rO| d } t j ƒ  j	 }	 |  j
 j j | ƒ }
 |
 d  k	 rÚ |
 j n d } i |	 d 6| d 6} | d	 k r#|
 j j | d
 <|
 j j | d <n  t | | ƒ } |  j | | f j |	 ƒ n  t j | ƒ ‚ d  S(   Ni    t   timeoutR$   R   R%   t   lostR7   t   statet   erredt	   exceptiont	   traceback(   R	   R   R)   R   t   TimeoutErrorR   R4   t   uuidt   uuid4t   hexR   t   tasksR   R:   t   exception_blameR<   R=   R   R   t   addt   Return(   R"   R.   R/   R'   R8   t   startt   leftR2   R0   R7   t   tsR:   t   msg(    (    s3   lib/python2.7/site-packages/distributed/variable.pyR   O   s*    	
c      	   c` su   t  ƒ  f y |  j | } Wn t k
 r. n) X| d d k rW |  j | d | ƒ Vn  |  j | =|  j | =Wd  QXd  S(   NR$   R   R%   (   R   R   R+   R,   R   (   R"   R.   R/   R'   R3   (    (    s3   lib/python2.7/site-packages/distributed/variable.pyR    h   s    

N(   t   __name__t
   __module__t   __doc__R#   R)   R   R   t	   coroutineR,   R   R   R    (    (    (    s3   lib/python2.7/site-packages/distributed/variable.pyR      s   	t   Variablec           B` sw   e  Z d  Z d
 d
 d d „ Z e j d „  ƒ Z d „  Z e j d
 d „ ƒ Z	 d
 d „ Z
 d „  Z d „  Z d	 „  Z RS(   s.   Distributed Global Variable

    This allows multiple clients to share futures and data between each other
    with a single mutable variable.  All metadata is sequentialized through the
    scheduler.  Race conditions can occur.

    Values must be either Futures or msgpack-encodable data (ints, lists,
    strings, etc..)  All data will be kept and sent through the scheduler, so
    it is wise not to send too much.  If you want to share a large amount of
    data then ``scatter`` it and share the future instead.

    .. warning::

       This object is experimental and has known issues in Python 2

    Examples
    --------
    >>> from dask.distributed import Client, Variable # doctest: +SKIP
    >>> client = Client()  # doctest: +SKIP
    >>> x = Variable('x')  # doctest: +SKIP
    >>> x.set(123)  # docttest: +SKIP
    >>> x.get()  # docttest: +SKIP
    123
    >>> future = client.submit(f, x)  # doctest: +SKIP
    >>> x.set(future)  # doctest: +SKIP

    See Also
    --------
    Queue: shared multi-producer/multi-consumer queue between clients
    i    c         C` s2   | p t  ƒ  |  _ | p( d t j ƒ  j |  _ d  S(   Ns	   variable-(   R   R'   R?   R@   RA   R/   (   R"   R/   R'   t   maxsize(    (    s3   lib/python2.7/site-packages/distributed/variable.pyR#   –   s    c         c` s_   t  | t ƒ r; |  j j j d t | j ƒ d |  j ƒ Vn  |  j j j d | d |  j ƒ Vd  S(   NR0   R/   R1   (   t
   isinstanceR   R'   R   R   R
   R0   R/   (   R"   R%   (    (    s3   lib/python2.7/site-packages/distributed/variable.pyt   _setš   s    c         K` s   |  j  j |  j | |  S(   s°    Set the value of this variable

        Parameters
        ----------
        value: Future or object
            Must be either a Future or a msgpack-encodable value
        (   R'   t   syncRQ   (   R"   R%   t   kwargs(    (    s3   lib/python2.7/site-packages/distributed/variable.pyR   £   s    c         c` sé   |  j  j j d | d |  j d |  j  j ƒ V} | d d k rÌ t | d |  j  d t d | d ƒ} | d d	 k r’ | j j | d
 | d ƒ n  |  j  j	 i d d 6|  j d 6| d d 6| d d 6ƒ n
 | d } t
 j | ƒ ‚ d  S(   NR8   R/   R'   R$   R   R%   t   informR:   R;   R<   R=   s   variable-future-releaset   opR0   R7   (   R'   R   R   R/   t   idR   t   Truet   _statet	   set_errort   _send_to_schedulerR   RE   (   R"   R8   t   dR%   (    (    s3   lib/python2.7/site-packages/distributed/variable.pyt   _get­   s    &	

c         K` s   |  j  j |  j d | | S(   s     Get the value of this variable R8   (   R'   RR   R\   (   R"   R8   RS   (    (    s3   lib/python2.7/site-packages/distributed/variable.pyR   Â   s    c         C` s:   |  j  j d k r6 |  j  j i d d 6|  j d 6ƒ n  d S(   sn    Delete this variable

        Caution, this affects all clients currently pointing to this variable.
        t   runningR   RU   R/   N(   R'   t   statusRZ   R/   (   R"   (    (    s3   lib/python2.7/site-packages/distributed/variable.pyR    Æ   s    c         C` s   |  j  |  j j j f S(   N(   R/   R'   R   t   address(   R"   (    (    s3   lib/python2.7/site-packages/distributed/variable.pyt   __getstate__Î   s    c         C` sz   | \ } } y( t  | ƒ } | j j | k s3 t ‚ Wn) t t f k
 r_ t | d t ƒ} n X|  j d | d | ƒ d  S(   Nt   set_as_defaultR/   R'   (   R   R   R_   t   AssertionErrort   AttributeErrorR   t   FalseR#   (   R"   R:   R/   R_   R'   (    (    s3   lib/python2.7/site-packages/distributed/variable.pyt   __setstate__Ñ   s    N(   RJ   RK   RL   R)   R#   R   RM   RQ   R   R\   R   R    R`   Re   (    (    (    s3   lib/python2.7/site-packages/distributed/variable.pyRN   v   s   		
		(    t
   __future__R    R   R   t   collectionsR   t   loggingR?   R   R   t   tornado.lockst   cytoolzR   t   ImportErrort   toolzR'   R   R   R   t   metricsR	   t   utilsR
   R   t   workerR   t	   getLoggerRJ   t   loggert   objectR   RN   (    (    (    s3   lib/python2.7/site-packages/distributed/variable.pyt   <module>   s    _