B
    F.\                 @   s   d Z ddlZddlmZ ddlmZ ddlmZ ddlmZ ddl	m
Z
 dd	l
mZmZ dd
lmZ ddlmZ dd ZG dd deZG dd de
jZee
jZdS )zAExperimental interface for asyncio, may disappear without warning    N)wraps)merge)BaseAsyncIOLoop)to_asyncio_future   )client)ClientFuture)Variable)ignoringc                s   t  fdd}|S )z;Converts Tornado gen.coroutines and futures to asyncio onesc                 s    rt  |}t| |S )N)r   r   )argskwargs)default_kwargsfn 2lib/python3.7/site-packages/distributed/asyncio.pyconvert   s    
zto_asyncio.<locals>.convert)r   )r   r   r   r   )r   r   r   
to_asyncio   s    r   c                   sl   e Zd ZdZ fddZdd Zdd Zdd	 Zd
d Ze	e
jddZe	e
jZe	e
jZe	e
jZ  ZS )	AioClienta`   Connect to and drive computation on a distributed Dask cluster

    This class provides an asyncio compatible async/await interface for
    dask.distributed.

    The Client connects users to a dask.distributed compute cluster. It
    provides an asynchronous user interface around functions and futures.
    This class resembles executors in ``concurrent.futures`` but also
    allows ``Future`` objects within ``submit/map`` calls.

    AioClient is an **experimental** interface for distributed and may
    disappear without warning!

    Parameters
    ----------
    address: string, or Cluster
        This can be the address of a ``Scheduler`` server like a string
        ``'127.0.0.1:8786'`` or a cluster object like ``LocalCluster()``

    Examples
    --------
    Provide cluster's scheduler address on initialization::

        client = AioClient('127.0.0.1:8786')

    Start the client::

        async def start_the_client():
            client = await AioClient()

            # Use the client....

            await client.close()

    An ``async with`` statement is a more convenient way to start and shut down
    the client::

        async def start_the_client():
            async with AioClient() as client:
                # Use the client within this block.
                pass

    Use the ``submit`` method to send individual computations to the cluster,
    and await the returned future to retrieve the result::

        async def add_two_numbers():
            async with AioClient() as client:
                a = client.submit(add, 1, 2)
                result = await a

    Continue using submit or map on results to build up larger computations,
    and gather results with the ``gather`` method::

        async def gather_some_results():
            async with AioClient() as client:
                a = client.submit(add, 1, 2)
                b = client.submit(add, 10, 20)
                c = client.submit(add, a, b)
                result = await client.gather([c])

    See Also
    --------
    distributed.client.Client: Blocking Client
    distributed.scheduler.Scheduler: Internal scheduler
    c                s,   t  }t|}t j||dd| d S )NT)loopZasynchronous)asyncioZget_event_loopr   super__init__)selfr   r   r   Zioloop)	__class__r   r   r   _   s    zAioClient.__init__c             C   s   t dd S )Nz2Use AioClient in an 'async with' block, not 'with')RuntimeError)r   r   r   r   	__enter__d   s    zAioClient.__enter__c                s   t | jI d H  | S )N)r   _started)r   r   r   r   
__aenter__g   s    zAioClient.__aenter__c                s   t |  I d H  d S )N)r   Z_close)r   typevalue	tracebackr   r   r   	__aexit__k   s    zAioClient.__aexit__c             C   s   t | j S )N)r   r   	__await__)r   r   r   r   r#   n   s    zAioClient.__await__F)sync)__name__
__module____qualname____doc__r   r   r   r"   r#   r   r   getr$   closeZshutdown__classcell__r   r   )r   r   r      s   A

r   c               @   s   e Zd ZeejjZdS )as_completedN)r%   r&   r'   r   r   r,   	__anext__r   r   r   r   r,   w   s   r,   )r(   r   	functoolsr   Ztoolzr   Ztornado.platform.asyncior   r    r   r   r	   Zvariabler
   Zutilsr   r   r   r,   Z_waitwaitr   r   r   r   <module>   s   
Z