B
    F.\                 @   s   d dl mZ d dlZddlmZmZ ddlmZmZ ddl	m
Z
 ddlmZ ddlmZ dd	lmZ G d
d deZG dd deZG dd deZdS )    )genN   )Futuredefault_client)get_thread_identityQueue)to_serialize)sync)
WrappedKey)
get_workerc               @   s~   e Zd ZdZdddZdd Zdd Zed	d
 Zedd Z	edd Z
edd Zdd Zdd Zdd Zedd ZdS )ActoraF   Controls an object on a remote worker

    An actor allows remote control of a stateful object living on a remote
    worker.  Method calls on this object trigger operations on the remote
    object and return ActorFutures on which we can block to get results.

    Examples
    --------
    >>> class Counter:
    ...    def __init__(self):
    ...        self.n = 0
    ...    def increment(self):
    ...        self.n += 1
    ...        return self.n

    >>> from dask.distributed import Client
    >>> client = Client()

    You can create an actor by submitting a class with the keyword
    ``actor=True``.

    >>> future = client.submit(Counter, actor=True)
    >>> counter = future.result()
    >>> counter
    <Actor: Counter, key=Counter-1234abcd>

    Calling methods on this object immediately returns deferred ``ActorFuture``
    objects.  You can call ``.result()`` on these objects to block and get the
    result of the function call.

    >>> future = counter.increment()
    >>> future.result()
    1
    >>> future = counter.increment()
    >>> future.result()
    2
    Nc             C   s   || _ || _|| _d | _|r*|| _d | _nZyt | _W n tk
rP   d | _Y nX yt | _t	|| _W n tk
r   d | _Y nX d S )N)
_cls_addresskey_future_worker_clientr   
ValueErrorr   r   )selfclsaddressr   worker r   0lib/python3.7/site-packages/distributed/actor.py__init__2   s     zActor.__init__c             C   s   d| j j| jf S )Nz<Actor: %s, key=%s>)r   __name__r   )r   r   r   r   __repr__E   s    zActor.__repr__c             C   s   t | j| j| jffS )N)r   r   r   r   )r   r   r   r   
__reduce__H   s    zActor.__reduce__c             C   s   | j r| j jS | jjS d S )N)r   io_loopr   )r   r   r   r   _io_loopK   s    zActor._io_loopc             C   s   | j r| j jS | jjS d S )N)r   	schedulerr   )r   r   r   r   _scheduler_rpcR   s    zActor._scheduler_rpcc             C   s>   | j r| j | jS | jjr*| j| jS t| jj| jS d S )N)r   rpcr   r   Zdirect_to_workersProxyRPCr    )r   r   r   r   _worker_rpcY   s
    zActor._worker_rpcc             C   s    | j r| j jS t | jjkS d S )N)r   Zasynchronousr   r   Z	thread_id)r   r   r   r   _asynchronousc   s    zActor._asynchronousc             O   s4   | j r| j j|f||S t| jj|f||S d S )N)r   r	   r   Zloop)r   funcargskwargsr   r   r   _syncj   s    zActor._syncc             C   s2   t tt| }|dd t| jD  t|S )Nc             s   s   | ]}| d s|V  qdS )_N)
startswith).0attrr   r   r   	<genexpr>s   s    z Actor.__dir__.<locals>.<genexpr>)setdirtypeupdater   sorted)r   or   r   r   __dir__q   s    zActor.__dir__c                st   t j }jr.jjdkr.tdjj t|rRt| fdd}|S tj	 fdd}
|S d S )N)Zfinishedpendingz(Worker holding Actor was lost.  Status: c                 sZ   t j fddjr$ S t t jfdd}j| tjS d S )Nc              3   st   y4j jjdd  D dd  D dV } W n, tk
r`   jrTjV  ntdY nX t| d d S )Nc             S   s   g | ]}t |qS r   )r   )r,   argr   r   r   
<listcomp>   s    zYActor.__getattr__.<locals>.func.<locals>.run_actor_function_on_worker.<locals>.<listcomp>c             S   s   i | ]\}}t ||qS r   )r   )r,   kvr   r   r   
<dictcomp>   s    zYActor.__getattr__.<locals>.func.<locals>.run_actor_function_on_worker.<locals>.<dictcomp>)Zfunctionactorr'   r(   z Unable to contact Actor's workerresult)r$   Zactor_executer   itemsOSErrorr   r   Return)r=   )r'   r   r(   r   r   r   run_actor_function_on_worker   s    
zEActor.__getattr__.<locals>.func.<locals>.run_actor_function_on_workerc              3   s    V }   |  d S )N)Zput)x)qrA   r   r   wait_then_add_to_queue   s    z?Actor.__getattr__.<locals>.func.<locals>.wait_then_add_to_queue)r   	coroutiner%   r   r   Zadd_callbackActorFuture)r'   r(   rD   )r   r   )r'   r(   rC   rA   r   r&   }   s    zActor.__getattr__.<locals>.funcc              3   s&   j j jdV } t| d d S )N)Z	attributer<   r=   )r$   Zactor_attributer   r   r@   )rB   )r   r   r   r   get_actor_attribute_from_worker   s    z:Actor.__getattr__.<locals>.get_actor_attribute_from_worker)getattrr   r   Zstatusr   callable	functoolswrapsr   rE   r)   )r   r   r-   r&   rG   r   )r   r   r   __getattr__v   s     zActor.__getattr__c             C   s   | j jS )N)r   client)r   r   r   r   rM      s    zActor.client)N)r   
__module____qualname____doc__r   r   r   propertyr   r!   r$   r%   r)   r5   rL   rM   r   r   r   r   r      s   %

1r   c               @   s    e Zd ZdZdd Zdd ZdS )r#   zQ
    An rpc-like object that uses the scheduler's rpc to connect to a worker
    c             C   s   || _ || _d S )N)r"   r   )r   r"   r   r   r   r   r      s    zProxyRPC.__init__c                s   t j fdd}|S )Nc              ;   s*    | d< j jj| dV }t|d S )Nop)r   msg)r"   proxyr   r   r@   )rS   r=   )r   r   r   r   r&      s    z"ProxyRPC.__getattr__.<locals>.func)r   rE   )r   r   r&   r   )r   r   r   rL      s    zProxyRPC.__getattr__N)r   rN   rO   rP   r   rL   r   r   r   r   r#      s   r#   c               @   s*   e Zd ZdZdd Zd	ddZdd ZdS )
rF   a   Future to an actor's method call

    Whenever you call a method on an Actor you get an ActorFuture immediately
    while the computation happens in the background.  You can call ``.result``
    to block and collect the full result

    See Also
    --------
    Actor
    c             C   s   || _ || _d S )N)rC   r   )r   rC   r   r   r   r   r      s    zActorFuture.__init__Nc             C   s2   y| j S  tk
r,   | jj|d| _ | j S X d S )N)timeout)Z_cached_resultAttributeErrorrC   get)r   rU   r   r   r   r=      s
    zActorFuture.resultc             C   s   dS )Nz<ActorFuture>r   )r   r   r   r   r      s    zActorFuture.__repr__)N)r   rN   rO   rP   r   r=   r   r   r   r   r   rF      s   

rF   )Ztornador   rJ   rM   r   r   Zcompatibilityr   r   Zprotocolr   Zutilsr	   Z
utils_commr
   r   r   r   objectr#   rF   r   r   r   r   <module>   s    !