ó
ßüÚ\c           @` sģ   d  d l  m Z m Z m Z d  d l m Z d  d l Z d  d l Z d  d l m	 Z	 m
 Z
 d  d l m Z d d l m Z d d l m Z e j e  Z d	 e f d
     YZ d S(   i    (   t   print_functiont   divisiont   absolute_import(   t   dequeN(   t   gent   locks(   t   IOLoopi   (   t   CommClosedError(   t   parse_timedeltat   BatchedSendc           B` st   e  Z d  Z d	 d	 d  Z d   Z d   Z d   Z e Z e	 j
 d    Z d   Z e	 j
 d    Z d   Z RS(
   sq   Batch messages in batches on a stream

    This takes an IOStream and an interval (in ms) and ensures that we send no
    more than one message every interval milliseconds.  We send lists of
    messages.

    Batching several messages at once helps performance when sending
    a myriad of tiny messages.

    Example
    -------
    >>> stream = yield connect(ip, port)
    >>> bstream = BatchedSend(interval='10 ms')
    >>> bstream.start(stream)
    >>> bstream.send('Hello,')
    >>> bstream.send('world!')

    On the other side, the recipient will get a message like the following::

        ['Hello,', 'world!']
    c         C` sē   | p t  j   |  _ t | d d |  _ t j   |  _ t j   |  _ t	 |  _
 g  |  _ d  |  _ d |  _ d |  _ d |  _ d  |  _ t d t j j d   |  _ | |  _ d  S(   Nt   defaultt   msi    t   maxlens+   distributed.comm.recent-messages-log-length(   R   t   currentt   loopR   t   intervalR   t   Eventt   wakert   stoppedt   Falset   please_stopt   buffert   Nonet   commt   message_countt   batch_countt
   byte_countt   next_deadlineR   t   daskt   configt   gett   recent_message_logt   serializers(   t   selfR   R   R    (    (    s2   lib/python2.7/site-packages/distributed/batched.pyt   __init__*   s    							c         C` s    | |  _  |  j j |  j  d  S(   N(   R   R   t   add_callbackt   _background_send(   R!   R   (    (    s2   lib/python2.7/site-packages/distributed/batched.pyt   start<   s    	c         C` s   |  j  o |  j  j   S(   N(   R   t   closed(   R!   (    (    s2   lib/python2.7/site-packages/distributed/batched.pyR&   @   s    c         C` s%   |  j    r d Sd t |  j  Sd  S(   Ns   <BatchedSend: closed>s   <BatchedSend: %d in buffer>(   R&   t   lenR   (   R!   (    (    s2   lib/python2.7/site-packages/distributed/batched.pyt   __repr__C   s    c         c` s  x|  j  sy% |  j j |  j  V|  j j   Wn t j k
 rG n X|  j s` d  |  _ q n  |  j d  k	 r |  j	 j
   |  j k  r q n  |  j g  } |  _ |  j d 7_ |  j	 j
   |  j |  _ z­ yd |  j j | d |  j d d V} | d k  r|  j j |  n |  j j d  |  j | 7_ WnB t k
 rU} t j d |  Pn t k
 rst j d  Pn XWd  d  } Xq W|  j j   d  S(	   Ni   R    t   on_errort   raiseg    .As   large-messages   Batched Comm Closed: %ss   Error in batched write(   R   R   t   waitR   t   clearR   t   TimeoutErrorR   R   R   t   timeR   R   R   t   writeR    R   t   appendR   R   t   loggert   infot	   Exceptiont	   exceptionR   t   set(   R!   t   payloadt   nbytest   e(    (    s2   lib/python2.7/site-packages/distributed/batched.pyR$   K   s<    		' 		c         C` si   |  j  d k	 r' |  j  j   r' t  n  |  j d 7_ |  j j |  |  j d k re |  j j	   n  d S(   sl    Schedule a message for sending to the other side

        This completes quickly and synchronously
        i   N(
   R   R   R&   R   R   R   R0   R   R   R5   (   R!   t   msg(    (    s2   lib/python2.7/site-packages/distributed/batched.pyt   sendq   s    	c         c` sē   |  j  d k r d St |  _ |  j j   |  j j   V|  j  j   sŪ yC |  j	 r g  |  j	 |  _	 } |  j  j
 | d |  j d d Vn  Wn t k
 r n X|  j  j   Vn  d S(   s-    Flush existing messages and then close comm NR    R)   R*   (   R   R   t   TrueR   R   R5   R   R+   R&   R   R/   R    R   t   close(   R!   R6   (    (    s2   lib/python2.7/site-packages/distributed/batched.pyR<      s    			c         C` sU   |  j  d  k r d  St |  _ g  |  _ |  j j   |  j  j   sQ |  j  j   n  d  S(   N(	   R   R   R;   R   R   R   R5   R&   t   abort(   R!   (    (    s2   lib/python2.7/site-packages/distributed/batched.pyR=      s    		N(   t   __name__t
   __module__t   __doc__R   R"   R%   R&   R(   t   __str__R   t	   coroutineR$   R:   R<   R=   (    (    (    s2   lib/python2.7/site-packages/distributed/batched.pyR	      s   			&	(   t
   __future__R    R   R   t   collectionsR   t   loggingR   t   tornadoR   R   t   tornado.ioloopR   t   coreR   t   utilsR   t	   getLoggerR>   R1   t   objectR	   (    (    (    s2   lib/python2.7/site-packages/distributed/batched.pyt   <module>   s   