ó
ßüÚ\c           @` sť   d  d l  m Z m Z m Z d  d l Z d  d l m Z d d l m Z d d l	 m
 Z
 m Z d d l m Z e j e  Z d	 e f d
     YZ d   Z d   Z e j d    Z d S(   i    (   t   print_functiont   divisiont   absolute_importN(   t   geni   (   t   SchedulerPlugini   (   t   connectt   coerce_to_address(   t   dumps_functiont   EventStreamc           B` s#   e  Z d  Z d d  Z d   Z RS(   s"    Maintain a copy of worker events c         C` s#   g  |  _  | r | j |   n  d  S(   N(   t   buffert
   add_plugin(   t   selft	   scheduler(    (    sB   lib/python2.7/site-packages/distributed/diagnostics/eventstream.pyt   __init__   s    	c         O` sH   | d k rD | | d <| d k s. | d k rD |  j  j |  qD n  d  S(   Nt
   processingt   keyt   memoryt   erred(   R	   t   append(   R   R   t   startt   finisht   argst   kwargs(    (    sB   lib/python2.7/site-packages/distributed/diagnostics/eventstream.pyt
   transition   s    
N(   t   __name__t
   __module__t   __doc__t   NoneR   R   (    (    (    sB   lib/python2.7/site-packages/distributed/diagnostics/eventstream.pyR      s   c         C` s   g  | j  | _  } | S(   N(   R	   (   R   t   esR	   (    (    sB   lib/python2.7/site-packages/distributed/diagnostics/eventstream.pyt   swap_buffer   s    c         C` s   |  j  |  d  S(   N(   t   remove_plugin(   R   R   (    (    sB   lib/python2.7/site-packages/distributed/diagnostics/eventstream.pyt   teardown$   s    c         c` so   t  |   }  t |   V} | j i d d 6t t  d 6t t  d 6| d 6t t  d 6 Vt j |   d S(   s   Open a TCP connection to scheduler, receive batched task messages

    The messages coming back are lists of dicts.  Each dict is of the following
    form::

        {'key': 'mykey', 'worker': 'host:port', 'status': status,
         'compute_start': time(), 'compute_stop': time(),
         'transfer_start': time(), 'transfer_stop': time(),
         'disk_load_start': time(), 'disk_load_stop': time(),
         'other': 'junk'}

    Where ``status`` is either 'OK', or 'error'

    Parameters
    ----------
    address: address of scheduler
    interval: time between batches, in seconds

    Examples
    --------
    >>> stream = yield eventstream('127.0.0.1:8786', 0.100)  # doctest: +SKIP
    >>> print(yield read(stream))  # doctest: +SKIP
    [{'key': 'x', 'status': 'OK', 'worker': '192.168.0.1:54684', ...},
     {'key': 'y', 'status': 'error', 'worker': '192.168.0.1:54684', ...}]
    t   feedt   opt   setupt   functiont   intervalR   N(	   R   R   t   writeR   R   R   R   R   t   Return(   t   addressR$   t   comm(    (    sB   lib/python2.7/site-packages/distributed/diagnostics/eventstream.pyt   eventstream(   s    (   t
   __future__R    R   R   t   loggingt   tornadoR   t   pluginR   t   coreR   R   t   workerR   t	   getLoggerR   t   loggerR   R   R   t	   coroutineR)   (    (    (    sB   lib/python2.7/site-packages/distributed/diagnostics/eventstream.pyt   <module>   s   		