B
    °F.\Š  ã               @   sŒ   d dl mZmZmZ d dlZd dlmZ ddlmZ ddl	m
Z
mZ ddlmZ e e¡ZG d	d
„ d
eƒZdd„ Zdd„ Zejdd„ ƒZdS )é    )Úprint_functionÚdivisionÚabsolute_importN)Úgené   )ÚSchedulerPluginé   )ÚconnectÚcoerce_to_address)Údumps_functionc               @   s"   e Zd ZdZddd„Zdd„ ZdS )ÚEventStreamz" Maintain a copy of worker events Nc             C   s   g | _ |r| | ¡ d S )N)ÚbufferZ
add_plugin)ÚselfÚ	scheduler© r   úBlib/python3.7/site-packages/distributed/diagnostics/eventstream.pyÚ__init__   s    zEventStream.__init__c             O   s0   |dkr,||d< |dks |dkr,| j  |¡ d S )NZ
processingÚkeyZmemoryZerred)r   Úappend)r   r   ÚstartZfinishÚargsÚkwargsr   r   r   Ú
transition   s    zEventStream.transition)N)Ú__name__Ú
__module__Ú__qualname__Ú__doc__r   r   r   r   r   r   r      s   
r   c             C   s   g |j  |_ }|S )N)r   )r   Úesr   r   r   r   Úswap_buffer   s    r   c             C   s   |   |¡ d S )N)Zremove_plugin)r   r   r   r   r   Úteardown$   s    r   c             c   sD   t | ƒ} t| ƒV }| dttƒttƒ|ttƒdœ¡V  t |¡‚dS )aƒ   Open a TCP connection to scheduler, receive batched task messages

    The messages coming back are lists of dicts.  Each dict is of the following
    form::

        {'key': 'mykey', 'worker': 'host:port', 'status': status,
         'compute_start': time(), 'compute_stop': time(),
         'transfer_start': time(), 'transfer_stop': time(),
         'disk_load_start': time(), 'disk_load_stop': time(),
         'other': 'junk'}

    Where ``status`` is either 'OK', or 'error'

    Parameters
    ----------
    address: address of scheduler
    interval: time between batches, in seconds

    Examples
    --------
    >>> stream = yield eventstream('127.0.0.1:8786', 0.100)  # doctest: +SKIP
    >>> print(yield read(stream))  # doctest: +SKIP
    [{'key': 'x', 'status': 'OK', 'worker': '192.168.0.1:54684', ...},
     {'key': 'y', 'status': 'error', 'worker': '192.168.0.1:54684', ...}]
    Zfeed)ÚopZsetupZfunctionÚintervalr   N)	r
   r	   Úwriter   r   r   r   r   ZReturn)Zaddressr!   Zcommr   r   r   Úeventstream(   s    
r#   )Z
__future__r   r   r   ZloggingZtornador   Zpluginr   Zcorer	   r
   Zworkerr   Z	getLoggerr   Zloggerr   r   r   Ú	coroutiner#   r   r   r   r   Ú<module>   s   
