ó
ßüÚ\c           @` s‰  d  d l  m Z m Z m Z d  d l m Z m Z m Z d  d l m	 Z	 d  d l
 Z
 d  d l Z d  d l Z d  d l m Z d  d l m Z d d l m Z d d	 l m Z d
 d l m Z d
 d l m Z e
 j e ƒ Z d e f d „  ƒ  YZ d e f d „  ƒ  YZ d e e ƒ f d „  ƒ  YZ d e e ƒ f d „  ƒ  YZ  d e e ƒ f d „  ƒ  YZ! e j" d e$ d d „ ƒ Z% e$ d d „ Z& d S(   i    (   t   print_functiont   divisiont   absolute_import(   t   ABCMetat   abstractmethodt   abstractproperty(   t	   timedeltaN(   t   with_metaclass(   t   geni   (   t   time(   t   parse_timedeltai   (   t   registry(   t   parse_addresst   CommClosedErrorc           B` s   e  Z RS(    (   t   __name__t
   __module__(    (    (    s4   lib/python2.7/site-packages/distributed/comm/core.pyR      s   t   FatalCommClosedErrorc           B` s   e  Z RS(    (   R   R   (    (    (    s4   lib/python2.7/site-packages/distributed/comm/core.pyR      s   t   Commc           B` sª   e  Z d  Z e j ƒ  Z d „  Z e d d „ ƒ Z	 e d d „ ƒ Z
 e d „  ƒ Z e d „  ƒ Z e d „  ƒ Z e d „  ƒ Z e d „  ƒ Z e d	 „  ƒ Z d
 „  Z RS(   sØ  
    A message-oriented communication object, representing an established
    communication channel.  There should be only one reader and one
    writer at a time: to manage current communications, even with a
    single peer, you must create distinct ``Comm`` objects.

    Messages are arbitrary Python objects.  Concrete implementations
    of this class can implement different serialization mechanisms
    depending on the underlying transport's characteristics.
    c         C` s   |  j  j |  ƒ d  |  _ d  S(   N(   t
   _instancest   addt   Nonet   name(   t   self(    (    s4   lib/python2.7/site-packages/distributed/comm/core.pyt   __init__+   s    c         C` s   d S(   sU  
        Read and return a message (a Python object).

        This method is a coroutine.

        Parameters
        ----------
        deserializers : Optional[Dict[str, Tuple[Callable, Callable, bool]]]
            An optional dict appropriate for distributed.protocol.deserialize.
            See :ref:`serialization` for more.
        N(    (   R   t   deserializers(    (    s4   lib/python2.7/site-packages/distributed/comm/core.pyt   read1   t    c         C` s   d S(   s(  
        Write a message (a Python object).

        This method is a coroutine.

        Parameters
        ----------
        msg :
        on_error : Optional[str]
            The behavior when serialization fails. See
            ``distributed.protocol.core.dumps`` for valid values.
        N(    (   R   t   msgt   on_error(    (    s4   lib/python2.7/site-packages/distributed/comm/core.pyt   write?   R   c         C` s   d S(   s¾   
        Close the communication cleanly.  This will attempt to flush
        outgoing buffers before actually closing the underlying transport.

        This method is a coroutine.
        N(    (   R   (    (    s4   lib/python2.7/site-packages/distributed/comm/core.pyt   closeN   R   c         C` s   d S(   s„   
        Close the communication immediately and abruptly.
        Useful in destructors or generators' ``finally`` blocks.
        N(    (   R   (    (    s4   lib/python2.7/site-packages/distributed/comm/core.pyt   abortW   R   c         C` s   d S(   s6   
        Return whether the stream is closed.
        N(    (   R   (    (    s4   lib/python2.7/site-packages/distributed/comm/core.pyt   closed^   R   c         C` s   d S(   sN   
        The local address.  For logging and debugging purposes only.
        N(    (   R   (    (    s4   lib/python2.7/site-packages/distributed/comm/core.pyt   local_addressd   R   c         C` s   d S(   sO   
        The peer's address.  For logging and debugging purposes only.
        N(    (   R   (    (    s4   lib/python2.7/site-packages/distributed/comm/core.pyt   peer_addressj   R   c         C` s   i  S(   sá   
        Return backend-specific information about the communication,
        as a dict.  Typically, this is information which is initialized
        when the communication is established and doesn't vary afterwards.
        (    (   R   (    (    s4   lib/python2.7/site-packages/distributed/comm/core.pyt
   extra_infop   s    c         C` sJ   |  j  j } |  j ƒ  r# d | f Sd | |  j p5 d |  j |  j f Sd  S(   Ns   <closed %s>s   <%s %s local=%s remote=%s>R   (   t	   __class__R   R    R   R!   R"   (   R   t   clsname(    (    s4   lib/python2.7/site-packages/distributed/comm/core.pyt   __repr__y   s    N(   R   R   t   __doc__t   weakreft   WeakSetR   R   R   R   R   R   R   R   R    R   R!   R"   t   propertyR#   R&   (    (    (    s4   lib/python2.7/site-packages/distributed/comm/core.pyR      s   
			t   Listenerc           B` sV   e  Z e d  „  ƒ Z e d „  ƒ Z e d „  ƒ Z e d „  ƒ Z d „  Z d „  Z	 RS(   c         C` s   d S(   s;   
        Start listening for incoming connections.
        N(    (   R   (    (    s4   lib/python2.7/site-packages/distributed/comm/core.pyt   start‡   R   c         C` s   d S(   s†   
        Stop listening.  This does not shutdown already established
        communications, but prevents accepting new ones.
        N(    (   R   (    (    s4   lib/python2.7/site-packages/distributed/comm/core.pyt   stop   R   c         C` s   d S(   s8   
        The listening address as a URI string.
        N(    (   R   (    (    s4   lib/python2.7/site-packages/distributed/comm/core.pyt   listen_address”   R   c         C` s   d S(   sÀ   
        An address this listener can be contacted on.  This can be
        different from `listen_address` if the latter is some wildcard
        address such as 'tcp://0.0.0.0:123'.
        N(    (   R   (    (    s4   lib/python2.7/site-packages/distributed/comm/core.pyt   contact_addressš   R   c         C` s   |  j  ƒ  |  S(   N(   R,   (   R   (    (    s4   lib/python2.7/site-packages/distributed/comm/core.pyt	   __enter__¢   s    
c         G` s   |  j  ƒ  d  S(   N(   R-   (   R   t   exc(    (    s4   lib/python2.7/site-packages/distributed/comm/core.pyt   __exit__¦   s    (
   R   R   R   R,   R-   R   R.   R/   R0   R2   (    (    (    s4   lib/python2.7/site-packages/distributed/comm/core.pyR+   †   s   	t	   Connectorc           B` s   e  Z e e d  „ ƒ Z RS(   c         C` s   d S(   s	  
        Connect to the given address and return a Comm object.
        This function is a coroutine.   It may raise EnvironmentError
        if the other endpoint is unreachable or unavailable.  It
        may raise ValueError if the address is malformed.
        N(    (   R   t   addresst   deserialize(    (    s4   lib/python2.7/site-packages/distributed/comm/core.pyt   connect«   R   (   R   R   R   t   TrueR6   (    (    (    s4   lib/python2.7/site-packages/distributed/comm/core.pyR3   ª   s   c         #` s€  ˆ d	 k r! t j j d ƒ ‰ n  t ˆ d d ƒ‰ t ˆ  ƒ \ } } t j | ƒ } | j ƒ  } t	 ƒ  } | ˆ }	 d	 }
 ‡  ‡ f d †  } xß t
 rlyK | j | d | | p¯ i  } t j t d |	 t	 ƒ  ƒ | d t ƒV} Wn† t k
 rõ ‚  qŽ t k
 rJ} t | ƒ }
 t	 ƒ  |	 k  r=t j d ƒ Vt j d ƒ qi| |
 ƒ qŽ t j k
 rg| |
 ƒ qŽ XPqŽ Wt j | ƒ ‚ d	 S(
   sÄ   
    Connect to the given address (a URI such as ``tcp://127.0.0.1:1234``)
    and yield a ``Comm`` object.  If the connection attempt fails, it is
    retried until the *timeout* is expired.
    s!   distributed.comm.timeouts.connectt   defaultt   secondsc         ` s/   |  p	 d }  d ˆ  ˆ |  f } t  | ƒ ‚ d  S(   Ns   connect() didn't finish in times0   Timed out trying to connect to %r after %s s: %s(   t   IOError(   t   errorR   (   t   addrt   timeout(    s4   lib/python2.7/site-packages/distributed/comm/core.pyt   _raiseÈ   s    
R5   t   quiet_exceptionsg{®Gáz„?s   sleeping on connectN(   R   t   daskt   configt   getR
   R   R   t   get_backendt   get_connectorR	   R7   R6   R   t   with_timeoutR   t   EnvironmentErrorR   t   strt   sleept   loggert   debugt   TimeoutErrort   Return(   R<   R=   R5   t   connection_argst   schemet   loct   backendt	   connectorR,   t   deadlineR;   R>   t   futuret   commt   e(    (   R<   R=   s4   lib/python2.7/site-packages/distributed/comm/core.pyR6   µ   s<    	

	c         C` sŸ   y t  |  d t ƒ\ } } WnU t k
 rs | rN | j d ƒ rN d |  }  n
 d |  }  t  |  d t ƒ\ } } n Xt j | ƒ } | j | | | | p› i   S(   sJ  
    Create a listener object with the given parameters.  When its ``start()``
    method is called, the listener will listen on the given address
    (a URI such as ``tcp://0.0.0.0``) and call *handle_comm* with a
    ``Comm`` object for each incoming connection.

    *handle_comm* can be a regular function or a coroutine.
    t   strictt   ssl_contexts   tls://s   tcp://(   R   R7   t
   ValueErrorRB   R   RC   t   get_listener(   R<   t   handle_commR5   RM   RN   RO   RP   (    (    s4   lib/python2.7/site-packages/distributed/comm/core.pyt   listení   s    	
('   t
   __future__R    R   R   t   abcR   R   R   t   datetimeR   t   loggingR(   R@   t   sixR   t   tornadoR   t   metricsR	   t   utilsR
   R   R   t
   addressingR   t	   getLoggerR   RI   R:   R   R   R   R+   R3   t	   coroutineR   R7   R6   R[   (    (    (    s4   lib/python2.7/site-packages/distributed/comm/core.pyt   <module>   s(   i$7