ó
ßüÚ\c           @   sÎ   d  d l  m Z d  d l Z d d l m Z m Z d d l m Z m Z d d l	 m
 Z
 d d l m Z d d l m Z d d	 l m Z d
 e f d „  ƒ  YZ d e f d „  ƒ  YZ d e f d „  ƒ  YZ d S(   iÿÿÿÿ(   t   genNi   (   t   Futuret   default_client(   t   get_thread_identityt   Queue(   t   to_serialize(   t   sync(   t
   WrappedKey(   t
   get_workert   Actorc           B   s’   e  Z d  Z d d „ Z d „  Z d „  Z e d „  ƒ Z e d „  ƒ Z	 e d „  ƒ Z
 e d „  ƒ Z d „  Z d	 „  Z d
 „  Z e d „  ƒ Z RS(   sF   Controls an object on a remote worker

    An actor allows remote control of a stateful object living on a remote
    worker.  Method calls on this object trigger operations on the remote
    object and return ActorFutures on which we can block to get results.

    Examples
    --------
    >>> class Counter:
    ...    def __init__(self):
    ...        self.n = 0
    ...    def increment(self):
    ...        self.n += 1
    ...        return self.n

    >>> from dask.distributed import Client
    >>> client = Client()

    You can create an actor by submitting a class with the keyword
    ``actor=True``.

    >>> future = client.submit(Counter, actor=True)
    >>> counter = future.result()
    >>> counter
    <Actor: Counter, key=Counter-1234abcd>

    Calling methods on this object immediately returns deferred ``ActorFuture``
    objects.  You can call ``.result()`` on these objects to block and get the
    result of the function call.

    >>> future = counter.increment()
    >>> future.result()
    1
    >>> future = counter.increment()
    >>> future.result()
    2
    c         C   s¬   | |  _  | |  _ | |  _ d  |  _ | r? | |  _ d  |  _ ni y t ƒ  |  _ Wn t k
 rk d  |  _ n Xy t	 ƒ  |  _ t
 | ƒ |  _ Wn t k
 r§ d  |  _ n Xd  S(   N(   t   _clst   _addresst   keyt   Nonet   _futuret   _workert   _clientR   t
   ValueErrorR   R   (   t   selft   clst   addressR   t   worker(    (    s0   lib/python2.7/site-packages/distributed/actor.pyt   __init__3   s     					c         C   s   d |  j  j |  j f S(   Ns   <Actor: %s, key=%s>(   R
   t   __name__R   (   R   (    (    s0   lib/python2.7/site-packages/distributed/actor.pyt   __repr__F   s    c         C   s   t  |  j |  j |  j f f S(   N(   R	   R
   R   R   (   R   (    (    s0   lib/python2.7/site-packages/distributed/actor.pyt
   __reduce__I   s    c         C   s!   |  j  r |  j  j S|  j j Sd  S(   N(   R   t   io_loopR   (   R   (    (    s0   lib/python2.7/site-packages/distributed/actor.pyt   _io_loopL   s    	
c         C   s!   |  j  r |  j  j S|  j j Sd  S(   N(   R   t	   schedulerR   (   R   (    (    s0   lib/python2.7/site-packages/distributed/actor.pyt   _scheduler_rpcS   s    	
c         C   sU   |  j  r |  j  j |  j ƒ S|  j j r; |  j j |  j ƒ St |  j j |  j ƒ Sd  S(   N(   R   t   rpcR   R   t   direct_to_workerst   ProxyRPCR   (   R   (    (    s0   lib/python2.7/site-packages/distributed/actor.pyt   _worker_rpcZ   s
    	c         C   s*   |  j  r |  j  j St ƒ  |  j j k Sd  S(   N(   R   t   asynchronousR   R   t	   thread_id(   R   (    (    s0   lib/python2.7/site-packages/distributed/actor.pyt   _asynchronousd   s    	
c         O   s<   |  j  r |  j  j | | | Ž St |  j j | | | Ž Sd  S(   N(   R   R   R   t   loop(   R   t   funct   argst   kwargs(    (    s0   lib/python2.7/site-packages/distributed/actor.pyt   _synck   s    	c         C   sB   t  t t |  ƒ ƒ ƒ } | j d „  t |  j ƒ Dƒ ƒ t | ƒ S(   Nc         s   s$   |  ] } | j  d  ƒ s | Vq d S(   t   _N(   t
   startswith(   t   .0t   attr(    (    s0   lib/python2.7/site-packages/distributed/actor.pys	   <genexpr>t   s    (   t   sett   dirt   typet   updateR
   t   sorted(   R   t   o(    (    s0   lib/python2.7/site-packages/distributed/actor.pyt   __dir__r   s     c            s£   t  ˆ j ˆ  ƒ } ˆ j rF ˆ j j d k rF t d ˆ j j ƒ ‚ n  t | ƒ rw t j | ƒ ‡  ‡ f d †  ƒ } | St j	 ‡  ‡ f d †  ƒ } ˆ j
 | ƒ Sd  S(   Nt   finishedt   pendings(   Worker holding Actor was lost.  Status: c             sy   t  j ‡  ‡ ‡ ‡ f d †  ƒ ‰ ˆ j r1 ˆ ƒ  St ƒ  ‰ t  j ‡ ‡ f d †  ƒ } ˆ j j | ƒ t ˆ ˆ j ƒ Sd  S(   Nc       
   3   s£   yX ˆ j  j d ˆ d ˆ j d g  ˆ  D] }  t |  ƒ ^ q% d d „  ˆ j ƒ  Dƒ ƒ V} Wn1 t k
 r‹ ˆ j r| ˆ j VqŒ t d ƒ ‚ n Xt j | d ƒ ‚ d  S(   Nt   functiont   actorR'   R(   c         S   s%   i  |  ] \ } } t  | ƒ | “ q S(    (   R   (   R,   t   kt   v(    (    s0   lib/python2.7/site-packages/distributed/actor.pys
   <dictcomp>Š   s   	 s    Unable to contact Actor's workert   result(	   R!   t   actor_executeR   R   t   itemst   OSErrorR   R    t   Return(   t   argR;   (   R'   R   R(   R   (    s0   lib/python2.7/site-packages/distributed/actor.pyt   run_actor_function_on_workerƒ   s    		c          3   s   ˆ ƒ  V}  ˆ  j  |  ƒ d  S(   N(   t   put(   t   x(   t   qRA   (    s0   lib/python2.7/site-packages/distributed/actor.pyt   wait_then_add_to_queueš   s    
(   R    t	   coroutineR$   R   R   t   add_callbackt   ActorFuture(   R'   R(   RE   (   R   R   (   R'   R(   RD   RA   s0   lib/python2.7/site-packages/distributed/actor.pyR&      s    !		c          3   s6   ˆ j  j d ˆ  d ˆ j ƒ V}  t j |  d ƒ ‚ d  S(   Nt	   attributeR8   R;   (   R!   t   actor_attributeR   R    R?   (   RC   (   R   R   (    s0   lib/python2.7/site-packages/distributed/actor.pyt   get_actor_attribute_from_worker§   s    (   R5   R6   (   t   getattrR
   R   t   statusR   t   callablet	   functoolst   wrapsR    RF   R)   (   R   R   R-   R&   RK   (    (   R   R   s0   lib/python2.7/site-packages/distributed/actor.pyt   __getattr__w   s    !"c         C   s
   |  j  j S(   N(   R   t   client(   R   (    (    s0   lib/python2.7/site-packages/distributed/actor.pyRR   °   s    N(   R   t
   __module__t   __doc__R   R   R   R   t   propertyR   R   R!   R$   R)   R4   RQ   RR   (    (    (    s0   lib/python2.7/site-packages/distributed/actor.pyR	      s   %		
			9R    c           B   s    e  Z d  Z d „  Z d „  Z RS(   sQ   
    An rpc-like object that uses the scheduler's rpc to connect to a worker
    c         C   s   | |  _  | |  _ d  S(   N(   R   R   (   R   R   R   (    (    s0   lib/python2.7/site-packages/distributed/actor.pyR   º   s    	c            s   t  j ‡  ‡ f d †  ƒ } | S(   Nc          ;   s<   ˆ  |  d <ˆ j  j d ˆ j d |  ƒ V} t j | ƒ ‚ d  S(   Nt   opR   t   msg(   R   t   proxyR   R    R?   (   RW   R;   (   R   R   (    s0   lib/python2.7/site-packages/distributed/actor.pyR&   ¿   s    
(   R    RF   (   R   R   R&   (    (   R   R   s0   lib/python2.7/site-packages/distributed/actor.pyRQ   ¾   s    (   R   RS   RT   R   RQ   (    (    (    s0   lib/python2.7/site-packages/distributed/actor.pyR    µ   s   	RH   c           B   s,   e  Z d  Z d „  Z d d „ Z d „  Z RS(   s   Future to an actor's method call

    Whenever you call a method on an Actor you get an ActorFuture immediately
    while the computation happens in the background.  You can call ``.result``
    to block and collect the full result

    See Also
    --------
    Actor
    c         C   s   | |  _  | |  _ d  S(   N(   RD   R   (   R   RD   R   (    (    s0   lib/python2.7/site-packages/distributed/actor.pyR   Ô   s    	c         C   s?   y |  j  SWn- t k
 r: |  j j d | ƒ |  _  |  j  SXd  S(   Nt   timeout(   t   _cached_resultt   AttributeErrorRD   t   get(   R   RY   (    (    s0   lib/python2.7/site-packages/distributed/actor.pyR;   Ø   s
    c         C   s   d S(   Ns   <ActorFuture>(    (   R   (    (    s0   lib/python2.7/site-packages/distributed/actor.pyR   ß   s    N(   R   RS   RT   R   R   R;   R   (    (    (    s0   lib/python2.7/site-packages/distributed/actor.pyRH   È   s   
	(   t   tornadoR    RO   RR   R   R   t   compatibilityR   R   t   protocolR   t   utilsR   t
   utils_commR   R   R   R	   t   objectR    RH   (    (    (    s0   lib/python2.7/site-packages/distributed/actor.pyt   <module>   s   ©