B
    °F.\å  ã               @   s‚   d dl mZmZmZ d dlmZ d dlZd dlZd dlm	Z	m
Z
 d dlmZ ddlmZ ddlmZ e e¡ZG d	d
„ d
eƒZdS )é    )Úprint_functionÚdivisionÚabsolute_import)ÚdequeN)ÚgenÚlocks)ÚIOLoopé   )ÚCommClosedError)Úparse_timedeltac               @   sb   e Zd ZdZddd„Zdd„ Zdd„ Zd	d
„ ZeZe	j
dd„ ƒZdd„ Ze	j
dd„ ƒZdd„ ZdS )ÚBatchedSendaq   Batch messages in batches on a stream

    This takes an IOStream and an interval (in ms) and ensures that we send no
    more than one message every interval milliseconds.  We send lists of
    messages.

    Batching several messages at once helps performance when sending
    a myriad of tiny messages.

    Example
    -------
    >>> stream = yield connect(ip, port)
    >>> bstream = BatchedSend(interval='10 ms')
    >>> bstream.start(stream)
    >>> bstream.send('Hello,')
    >>> bstream.send('world!')

    On the other side, the recipient will get a message like the following::

        ['Hello,', 'world!']
    Nc             C   sx   |p
t  ¡ | _t|dd| _t ¡ | _t ¡ | _d| _	g | _
d | _d| _d| _d| _d | _ttj d¡d| _|| _d S )NZms)ÚdefaultFr   z+distributed.comm.recent-messages-log-length)Úmaxlen)r   ZcurrentÚloopr   Úintervalr   ZEventÚwakerÚstoppedÚplease_stopÚbufferÚcommÚmessage_countÚbatch_countÚ
byte_countÚnext_deadliner   ÚdaskZconfigÚgetÚrecent_message_logÚserializers)Úselfr   r   r   © r   ú2lib/python3.7/site-packages/distributed/batched.pyÚ__init__)   s    

zBatchedSend.__init__c             C   s   || _ | j | j¡ d S )N)r   r   Zadd_callbackÚ_background_send)r   r   r   r   r    Ústart9   s    zBatchedSend.startc             C   s   | j o| j  ¡ S )N)r   Úclosed)r   r   r   r    r$   =   s    zBatchedSend.closedc             C   s   |   ¡ rdS dt| jƒ S d S )Nz<BatchedSend: closed>z<BatchedSend: %d in buffer>)r$   Úlenr   )r   r   r   r    Ú__repr__@   s    zBatchedSend.__repr__c          
   c   sV  xD| j sFy| j | j¡V  | j ¡  W n tjk
r@   Y nX | jsPd | _q| jd k	rl| j 	¡ | jk rlq| jg  }| _|  j
d7  _
| j 	¡ | j | _z¢yJ| jj|| jddV }|dk rÊ| j |¡ n| j d¡ |  j|7  _W nR tk
r } zt d|¡ P W d d }~X Y n" tk
r8   t d¡ P Y nX W d d }X qW | j ¡  d S )Nr	   Úraise)r   Úon_errorg    €„.Azlarge-messagezBatched Comm Closed: %szError in batched write)r   r   Úwaitr   Úclearr   ÚTimeoutErrorr   r   Ztimer   r   r   Úwriter   r   Úappendr   r
   ÚloggerÚinfoÚ	ExceptionZ	exceptionr   Úset)r   ÚpayloadÚnbytesÚer   r   r    r"   H   s>    



zBatchedSend._background_sendc             C   sJ   | j dk	r| j  ¡ rt‚|  jd7  _| j |¡ | jdkrF| j ¡  dS )zl Schedule a message for sending to the other side

        This completes quickly and synchronously
        Nr	   )	r   r$   r
   r   r   r-   r   r   r1   )r   Úmsgr   r   r    Úsendo   s    
zBatchedSend.sendc             c   sˆ   | j dkrdS d| _| j ¡  | j ¡ V  | j  ¡ s„y.| jr`g | j | _}| j j|| j	ddV  W n t
k
rv   Y nX | j  ¡ V  dS )z- Flush existing messages and then close comm NTr'   )r   r(   )r   r   r   r1   r   r)   r$   r   r,   r   r
   Úclose)r   r2   r   r   r    r7   }   s    


zBatchedSend.closec             C   s<   | j d krd S d| _g | _| j ¡  | j  ¡ s8| j  ¡  d S )NT)r   r   r   r   r1   r$   Úabort)r   r   r   r    r8      s    


zBatchedSend.abort)NN)Ú__name__Ú
__module__Ú__qualname__Ú__doc__r!   r#   r$   r&   Ú__str__r   Ú	coroutiner"   r6   r7   r8   r   r   r   r    r      s   
'r   )Z
__future__r   r   r   Úcollectionsr   Zloggingr   Ztornador   r   Ztornado.ioloopr   Zcorer
   Zutilsr   Z	getLoggerr9   r.   Úobjectr   r   r   r   r    Ú<module>   s   
