
\c           @` s  d  d l  m Z m Z m Z d  d l m Z m Z d  d l m Z d  d l	 m
 Z
 d  d l Z d  d l Z d  d l Z d  d l Z d  d l Z d  d l Z d  d l m Z d  d l m Z d  d l m Z d  d	 l m Z d  d
 l m Z d d l m Z d d l m Z m Z m Z m  Z  m! Z! m" Z" d d l# m$ Z$ d d l% m& Z& d d l' m( Z( d d l) m* Z* m+ Z+ m, Z, m- Z- m. Z. m/ Z/ m0 Z0 d d l% m1 Z1 d e2 f d     YZ3 e j4 e5  Z6 d   Z7 d   Z8 e7   Z9 e/ e j: j; d  d d Z< e j: j; d  Z= d e> f d     YZ? d   Z@ e jA eB eC eC d   ZD eC eC eC d  ZE d  e> f d!     YZF d" e> f d#     YZG d$ e> f d%     YZH d&   ZI d' d(  ZJ d)   ZK d S(*   i    (   t   print_functiont   divisiont   absolute_import(   t   defaultdictt   deque(   t   CancelledError(   t   partialN(   t   string_types(   t   merge(   t   gen(   t   IOLoop(   t   Eventi   (   t   get_thread_identity(   t   connectt   listent   CommClosedErrort   normalize_addresst   unparse_host_portt   get_address_host_port(   t   time(   t   profile(   t   SystemMonitor(   t   get_tracebackt   truncate_exceptiont   ignoringt   shutting_downt   PeriodicCallbackt   parse_timedeltat   has_keyword(   t   protocolt	   RPCClosedc           B` s   e  Z RS(    (   t   __name__t
   __module__(    (    (    s/   lib/python2.7/site-packages/distributed/core.pyR   +   s   c          C` s:   y! d d  l  }  |  j   j d SWn t k
 r5 d SXd  S(   Ni    i   g    eA(   t   psutilt   virtual_memoryt   totalt   ImportError(   R!   (    (    s/   lib/python2.7/site-packages/distributed/core.pyt   get_total_physical_memory2   s
    c         ` s     f d   } | S(   Nc          ` s
      d  S(   N(    (   t   argst   kwargs(   t   exc(    s/   lib/python2.7/site-packages/distributed/core.pyt   _raise<   s    (    (   R(   R)   (    (   R(   s/   lib/python2.7/site-packages/distributed/core.pyt   raise_later;   s    s   distributed.admin.tick.limitt   defaultt   mss   distributed.admin.pdb-on-errt   Serverc           B` s   e  Z d  Z d Z d Z d d d e d d  Z d   Z d   Z	 d   Z
 d   Z e d	    Z e d
    Z e d    Z d d  Z d d d  Z e j e d   Z e j d g  d   Z e j d    Z RS(   sO   Distributed TCP Server

    Superclass for endpoints in a distributed cluster, such as Worker
    and Scheduler objects.

    **Handlers**

    Servers define operations with a ``handlers`` dict mapping operation names
    to functions.  The first argument of a handler function will be a ``Comm``
    for the communication established with the client.  Other arguments
    will receive inputs from the keys of the incoming message which will
    always be a dictionary.

    >>> def pingpong(comm):
    ...     return b'pong'

    >>> def add(comm, x, y):
    ...     return x + y

    >>> handlers = {'ping': pingpong, 'add': add}
    >>> server = Server(handlers)  # doctest: +SKIP
    >>> server.listen('tcp://0.0.0.0:8000')  # doctest: +SKIP

    **Message Format**

    The server expects messages to be dictionaries with a special key, `'op'`
    that corresponds to the name of the operation, and other key-value pairs as
    required by the function.

    So in the example above the following would be good messages.

    *  ``{'op': 'ping'}``
    *  ``{'op': 'add', 'x': 10, 'y': 20}``

    t    i    i   c      	   ` s<  i  j  d 6 j d 6 _  j j |  | d  k rd t j j d t   j	 j
   g   } n  |  _ i   _  j j | p i   t   j	 d t t j     _ d   _ d   _ d   _ i   _ |  _ t    _ d   _ d   _ d   _ d   _ t j    _ d   _ | p6t  j!    _"  j"  _# t$  j" d  st j%  j"    t$  j" d  r  f d   } n   f d   } t& j' d	 d! d t j j d  d t j j d  d |   j" _& n  t( t)  3 d d l* m+ } t, t- | d  j"   _ Wd  QXd d l* m. }	 t, t- |	 d  j"   _ t, d     _ t, d     _ t/    _0 t1  j j d d  j" }
 |
  j0 d <t2    _3 t1  j4 t5 t j j d  d d d d  j" }
 |
  j0 d <d  _6 t7 j8  f d     }  j" j9 |  t:  _; d  S("   Nt   identityt   connection_streams   distributed.%s.blocked-handlerst   -R   t   closingc          ` s       }  |  d  k p |  j S(   N(   t   NoneR2   (   t   loop(   t   ref(    s/   lib/python2.7/site-packages/distributed/core.pyt   stop   s    	c          ` s       }  |  d  k p |  j S(   N(   R3   t   _closing(   R4   (   R5   (    s/   lib/python2.7/site-packages/distributed/core.pyR6      s    	t   omits
   profile.pys   selectors.pyt   intervals#   distributed.worker.profile.intervalt   cycles    distributed.worker.profile.cycleR6   i   (   t   DigestR4   (   t   Counterc           S` s   t  d d  S(   Nt   maxleni'  (   R   (    (    (    s/   lib/python2.7/site-packages/distributed/core.pyt   <lambda>   R.   c           S` s   d S(   Ni    (    (    (    (    s/   lib/python2.7/site-packages/distributed/core.pyR>      R.   i  t   io_loopt   monitors   distributed.admin.tick.intervalR+   R,   i  t   ticki    c           ` s   t      _ d  S(   N(   R   t	   thread_id(    (   t   self(    s/   lib/python2.7/site-packages/distributed/core.pyt   set_thread_ident   s    (   s
   profile.pys   selectors.py(<   R/   t   handle_streamt   handlerst   updateR3   t   daskt   configt   gett   typeR   t   lowert   blocked_handlerst   stream_handlerst   strt   uuidt   uuid4t   idt   _addresst   _listen_addresst   _portt   _commst   deserializeR   R@   t   counterst   digestst   eventst   event_countst   weakreft   WeakSett   _ongoing_coroutinest   listenerR
   t   currentR?   R4   t   hasattrR5   R   t   watchR   R$   t   counterR;   R   R   R<   t   dictt   periodic_callbacksR   R   t
   _last_tickt   _measure_tickR   RB   R	   t	   coroutinet   add_callbackt   Falset   _Server__stopped(   RC   RF   RM   RN   t   connection_limitRW   R?   R6   R;   R<   t   pcRD   (    (   R5   RC   s/   lib/python2.7/site-packages/distributed/core.pyt   __init__s   sr    	
	"		&											$	c         ` s/   t      _   f d   }   j j |  d S(   s    Start Periodic Callbacks consistently

        This starts all PeriodicCallbacks stored in self.periodic_callbacks if
        they are not yet running.  It does this safely on the IOLoop.
        c          ` s7   x0   j  j   D] }  |  j   s |  j   q q Wd  S(   N(   Re   t   valuest
   is_runningt   start(   Rm   (   RC   (    s/   lib/python2.7/site-packages/distributed/core.pyt	   start_pcs   s    N(   R   Rf   R?   Ri   (   RC   Rr   (    (   RC   s/   lib/python2.7/site-packages/distributed/core.pyt   start_periodic_callbacks   s    c         C` sA   |  j  s= t |  _  |  j d  k	 r= |  j j |  j j  q= n  d  S(   N(   Rk   t   TrueR_   R3   R?   Ri   R6   (   RC   (    (    s/   lib/python2.7/site-packages/distributed/core.pyR6      s    		c         C` st   t    } | |  j } | |  _ | t k rJ t j d t |   j |  n  |  j d  k	 rp |  j d j	 |  n  d  S(   Ns   Event loop was unresponsive in %s for %.2fs.  This is often caused by long-running GIL-holding functions or moving large chunks of data. This can cause timeouts and instability.s   tick-duration(
   R   Rf   t   tick_maximum_delayt   loggert   infoRK   R   RY   R3   t   add(   RC   t   nowt   diff(    (    s/   lib/python2.7/site-packages/distributed/core.pyRg      s    		
c         C` s   t    | d <t | t  rW x_ | D]- } |  j | j |  |  j | c d 7<q# Wn' |  j | j |  |  j | c d 7<d  S(   NR   i   (   R   t
   isinstancet   listRZ   t   appendR[   (   RC   t   namet   msgt   n(    (    s/   lib/python2.7/site-packages/distributed/core.pyt	   log_event  s    c         C` s@   |  j  s9 |  j d k r' t d   n  |  j j |  _  n  |  j  S(   s>   
        The address this Server can be contacted on.
        s(   cannot get address of non-running ServerN(   RS   R_   R3   t
   ValueErrort   contact_address(   RC   (    (    s/   lib/python2.7/site-packages/distributed/core.pyt   address  s
    	c         C` s@   |  j  s9 |  j d k r' t d   n  |  j j |  _  n  |  j  S(   s   
        The address this Server is listening on.  This may be a wildcard
        address such as `tcp://0.0.0.0:1234`.
        s/   cannot get listen address of non-running ServerN(   RT   R_   R3   R   t   listen_address(   RC   (    (    s/   lib/python2.7/site-packages/distributed/core.pyR     s
    	c         C` s+   |  j  s$ t |  j  \ } |  _  n  |  j  S(   s   
        The port number this Server is listening on.

        This will raise ValueError if the Server is listening on a
        non-IP based protocol.
        (   RU   R   R   (   RC   t   _(    (    s/   lib/python2.7/site-packages/distributed/core.pyt   port#  s    	c         C` s   i t  |   j d 6|  j d 6S(   NRK   RR   (   RK   R   RR   (   RC   t   comm(    (    s/   lib/python2.7/site-packages/distributed/core.pyR/   /  s    c         C` s   | d  k r |  j } n  t | t  r< t |  j |  } n9 t | t  rZ t |   } n | } t | t  su t  t	 | |  j
 d |  j d | |  _ |  j j   d  S(   NRW   t   connection_args(   R3   t   default_portR{   t   intR   t
   default_ipt   tupleR   t   AssertionErrorR   t   handle_commRW   R_   Rq   (   RC   t   port_or_addrt   listen_argst   addr(    (    s/   lib/python2.7/site-packages/distributed/core.pyR   2  s    		c         c` sg  |  j  r | j   d S| j } d } t j d | t |   j  | |  j | <zxt	 ry$ | j
   V} t j d | |  Wnr t k
 r } |   s t j d | | |  n  Pn= t k
 r } t j |  | j t | d d  VqU n Xt | t  st d t |    n  y | j d  } Wn' t k
 rXt d	 t |    n X|  j d k	 r|  j d j |  n  | |  j | <| j d
 d  } | j d t  } | j d t	  }	 | d k r|	 r| j d  Vn  Pn  d }
 y\ | |  j k r=d } t | j d | d t |   j   } t |  } n |  j | } Wn3 t k
 rt j d | t |   j d t	 n X| d k	 rt | d
  r| | d
 <n  t j d | j  yB | | |  }
 t |
  t  j! k r|  j" j |
  |
 V}
 n  Wnp t# t$ f k
 r?} |  j% d k r;t j& d | |  n  Pn2 t k
 rp} t j |  t | d d }
 n X|	 r|
 d k ry | j |
 d
 | VWqt t f k
 r} t j d | | |  PqXn  d } }
 | r| j'   Vn  | j(   rU PqU qU WWd |  j | =|   rb| j(   rby | j   Wqbt k
 r^} t j) d | |  qbXn  Xd S(   s   Dispatch new communications to coroutine-handlers

        Handlers is a dictionary mapping operation names to functions or
        coroutines.

            {'get_data': get_data,
             'ping': pingpong}

        Coroutines should expect a single Comm object.
        Ns   Connection from %r to %ss   Message from %r: %ssC   Lost connection to %r while reading message: %s. Last operation: %st   statuss   uncaught-errors(   Bad message type.  Expected dict, got
  t   ops.   Received unexpected message without 'op' key: t   serializerst   closet   replyt   OKs^   The '{op}' handler has been explicitly disallowed in {obj}, possibly due to security concerns.t   objs   No handler %s found in %st   exc_infos   Calling into handler %st   runnings   Lost connection to %r: %ss
   dont-replys8   Lost connection to %r while sending result for op %r: %ss)   Failed while closing connection to %r: %s(*   Rk   t   abortt   peer_addressR3   Rv   t   debugRK   R   RV   Rt   t   readt   EnvironmentErrort	   Exceptiont	   exceptiont   writet   error_messageR{   Rd   t	   TypeErrorRO   t   popt   KeyErrorR   RX   Rx   Rj   RM   t   formatR*   RF   t   warningR   R	   t   FutureR^   R   R   R   Rw   R   t   closedt   error(   RC   R   R   R   R   R   t   eR   t   close_desiredR   t   resultt   _msgR(   t   handler(    (    s/   lib/python2.7/site-packages/distributed/core.pyR   D  s    	
			
'

c         c` s  | p	 i  } t  j d  d  } t } zLy x | s| j   V} t | t t f  sb | f } n  | j   s x | D]t } | d k r Pn  | j	 d  } | r | d k r t
 } Pn  |  j | }	 |	 t | |    qu t  j d |  qu Wn  x | D] }
 |
   q Wq. WWn` t t f k
 r.} | } nB t k
 ro} t  j |  t rid d  l } | j   n    n XWd  | j   | j   st  Xd  S(   Ns   Starting established connectionR   R   s   close-streams   odd message %si    (   Rv   Rw   R3   Rj   R   R{   R   R|   R   R   Rt   RN   R   R   R   R   R   R   t   LOG_PDBt   pdbt	   set_traceR   R   (   RC   R   t   extrat   every_cyclet   io_errorR   t   msgsR   R   R   t   funcR   R   (    (    s/   lib/python2.7/site-packages/distributed/core.pyRE     sD     		
c         #` s   |  j  j   x/ t d  D]! } |  j s- Pq t j d  Vq Wx |  j D] } | j   qI Wx |  j D]     j   qg WxE t d  D]7 } t	   f d   |  j D  r Pq t j d  Vq Wd  S(   Ni   g?i
   c         3` s   |  ] }   j    Vq d  S(   N(   t	   cancelled(   t   .0t   c(   t   cb(    s/   lib/python2.7/site-packages/distributed/core.pys	   <genexpr>  s    g{Gz?(
   R_   R6   t   rangeRV   R	   t   sleepR   R^   t   cancelt   all(   RC   t   iR   (    (   R   s/   lib/python2.7/site-packages/distributed/core.pyR     s    	N(   R   R    t   __doc__R   R   R3   Rt   Rn   Rs   R6   Rg   R   t   propertyR   R   R   R/   R   R	   Rh   R   R   RE   R   (    (    (    s/   lib/python2.7/site-packages/distributed/core.pyR-   K   s,   #\				
{)c         C` s   d S(   Nt   pong(    (   R   (    (    s/   lib/python2.7/site-packages/distributed/core.pyt   pingpong  s    c   	      k` sH  | } | | d <| j  d  } t } | d k r: | } n  | d k	 rS | | d <n  za y@ |  j | d | d d V| r |  j d |  V} n d } Wn t k
 r t }   n XWd | r |  j   Vn | r |  j   n  Xt	 | t
  r5| j  d  d	 k r5|  j r"t j t |     q5t | d
   n  t j |   d S(   s    Send and recv with a Comm.

    Keyword arguments turn into the message

    response = yield send_recv(comm, op='ping', reply=True)
    R   R   R   t   on_errort   raiset   deserializersNR   s   uncaught-errort   text(   RJ   Rj   R3   R   R   R   Rt   R   R   R{   Rd   RW   t   sixt   reraiset   clean_exceptionR   R	   t   Return(	   R   R   R   R   R'   R   t   please_closet   force_closet   response(    (    s/   lib/python2.7/site-packages/distributed/core.pyt	   send_recv  s4    
	 
$	c         C` sa   |  d  k r | | f }  n | d  k r3 | d  k s9 t  t |  t  rW t |    }  n  t |   S(   N(   R3   R   R{   R   R   R   (   R   t   ipR   (    (    s/   lib/python2.7/site-packages/distributed/core.pyt   addr_from_args*  s    t   rpcc           B` s   e  Z d  Z e j   Z d
 Z d Z d d e	 d d d d d  Z
 e j d    Z d   Z d   Z d   Z d   Z d   Z d   Z d	   Z RS(   s   Conveniently interact with a remote server

    >>> remote = rpc(address)  # doctest: +SKIP
    >>> response = yield remote.add(x=10, y=20)  # doctest: +SKIP

    One rpc object can be reused for several interactions.
    Additionally, this object creates and destroys many comms as necessary
    and so is safe to use in multiple overlapping communications.

    When done, close comms explicitly.

    >>> remote.close_comms()  # doctest: +SKIP
    c         C` s   i  |  _  t |  |  _ | |  _ d |  _ | |  _ | |  _ | d  k	 rN | n | |  _ | |  _	 t
 j   |  _ t j j |   d  S(   NR   (   t   commst   coerce_to_addressR   t   timeoutR   RW   R   R3   R   R   R\   R]   t   _createdR   t   activeRx   (   RC   t   argR   RW   R   R   R   R   (    (    s/   lib/python2.7/site-packages/distributed/core.pyRn   G  s    
						c         c` s   |  j  d k r t d   n  t   } t } xC |  j j   D]2 \ } } | j   re | j |  n  | r= Pq= q= Wx | D] } |  j | =qz W| s | j   r t |  j	 |  j
 d |  j d |  j V} d | _ n  t |  j | <t j |   d S(   s@   Get an open communication

        Some comms to the ip/port target may be in current use by other
        coroutines.  We track this with the `comms` dict

            :: {comm: True/False if open and ready for use}

        This function produces an open communication, either by taking one
        that we've already made or making a new one if they are all taken.
        This also removes comms that have been closed.

        When the caller is done with the stream they should set

            self.comms[comm] = True

        As is done in __getattr__ below.
        R   s
   RPC ClosedRW   R   R   N(   R   R   t   setRj   R   t   itemsR   Rx   R   R   R   RW   R   R~   R	   R   (   RC   t   to_cleart   openR   t   s(    (    s/   lib/python2.7/site-packages/distributed/core.pyt	   live_comm\  s(    			c         C` s   t  j d    } x7 t |  j  D]& } | r" | j   r" | |  q" q" Wx7 t |  j  D]& } | r\ | j   r\ | |  q\ q\ W|  j j   d  S(   Nc         s` sM   y+ |  j  i d d 6t d 6 V|  j   VWn t k
 rH |  j   n Xd  S(   NR   R   R   (   R   Rj   R   R   R   (   R   (    (    s/   lib/python2.7/site-packages/distributed/core.pyt   _close_comm  s
    (   R	   Rh   R|   R   R   R   t   clear(   RC   R   R   (    (    s/   lib/python2.7/site-packages/distributed/core.pyt   close_comms  s    	c         ` s   t  j    f d    } | S(   Nc          ;` s    j  d  k	 r4 |  j d  d  k r4  j  |  d <n   j d  k	 rh |  j d  d  k rh  j |  d <n  y7  j   V} d   | _ t d | d   |   V} Wn2 t t f k
 r } | j	 d |   f   n Xt
  j | <t j |   d  S(   NR   R   s   rpc.R   R   s)   %s: while trying to call remote method %r(   R   R3   RJ   R   R   R~   R   R   R   t	   __class__Rt   R   R	   R   (   R'   R   R   R   (   t   keyRC   (    s/   lib/python2.7/site-packages/distributed/core.pyt   send_recv_from_rpc  s    $$(   R	   Rh   (   RC   R   R   (    (   R   RC   s/   lib/python2.7/site-packages/distributed/core.pyt   __getattr__  s    c         C` s9   |  j  d k r" t j j |   n  d |  _  |  j   d  S(   NR   (   R   R   R   t   discardR   (   RC   (    (    s/   lib/python2.7/site-packages/distributed/core.pyt	   close_rpc  s    	c         C` s   |  S(   N(    (   RC   (    (    s/   lib/python2.7/site-packages/distributed/core.pyt	   __enter__  s    c         G` s   |  j    d  S(   N(   R   (   RC   R&   (    (    s/   lib/python2.7/site-packages/distributed/core.pyt   __exit__  s    c         C` s   |  j  d k r t j j |   d |  _  g  |  j D] } | j   s2 | ^ q2 } | r t j d |  t |   x | D] } | j	   qv Wq n  d  S(   NR   s(   rpc object %s deleted with %d open comms(
   R   R   R   R   R   R   Rv   R   t   lenR   (   RC   R   t
   still_open(    (    s/   lib/python2.7/site-packages/distributed/core.pyt   __del__  s    	(c         C` s   d |  j  t |  j  f S(   Ns   <rpc to %r, %d comms>(   R   R   R   (   RC   (    (    s/   lib/python2.7/site-packages/distributed/core.pyt   __repr__  s    (    N(   R   R    R   R\   R]   R   R   R3   R   Rt   Rn   R	   Rh   R   R   R   R   R   R   R   R   (    (    (    s/   lib/python2.7/site-packages/distributed/core.pyR   4  s&   )						t   PooledRPCCallc           B` sY   e  Z d  Z d d d  Z e d    Z d   Z d   Z d   Z	 d   Z
 d   Z RS(	   sW    The result of ConnectionPool()('host:port')

    See Also:
        ConnectionPool
    c         C` s:   | |  _  | |  _ | |  _ | d  k	 r- | n | |  _ d  S(   N(   R   t   poolR   R3   R   (   RC   R   R   R   R   (    (    s/   lib/python2.7/site-packages/distributed/core.pyRn     s    			c         C` s   |  j  S(   N(   R   (   RC   (    (    s/   lib/python2.7/site-packages/distributed/core.pyR     s    c         ` s   t  j    f d    } | S(   Nc          ;` s    j  d  k	 r4 |  j d  d  k r4  j  |  d <n   j d  k	 rh |  j d  d  k rh  j |  d <n   j j  j  V} | j d   } | _ z t d | d   |   V} Wd   j j	  j |  | | _ Xt
 j |   d  S(   NR   R   s   ConnectionPool.R   R   (   R   R3   RJ   R   R   R   R   R~   R   t   reuseR	   R   (   R'   R   R~   R   (   R   RC   (    s/   lib/python2.7/site-packages/distributed/core.pyR     s    $$
(   R	   Rh   (   RC   R   R   (    (   R   RC   s/   lib/python2.7/site-packages/distributed/core.pyR     s    c         C` s   d  S(   N(    (   RC   (    (    s/   lib/python2.7/site-packages/distributed/core.pyR     s    c         C` s   |  S(   N(    (   RC   (    (    s/   lib/python2.7/site-packages/distributed/core.pyR     s    c         G` s   d  S(   N(    (   RC   R&   (    (    s/   lib/python2.7/site-packages/distributed/core.pyR     s    c         C` s   d |  j  f S(   Ns   <pooled rpc to %r>(   R   (   RC   (    (    s/   lib/python2.7/site-packages/distributed/core.pyR     s    N(   R   R    R   R3   Rn   R   R   R   R   R   R   R   (    (    (    s/   lib/python2.7/site-packages/distributed/core.pyR     s   				t   ConnectionPoolc           B` s   e  Z d  Z e j   Z d e d d d d d d  Z e	 d    Z
 e	 d    Z d   Z d d d d  Z e j d d   Z d   Z d	   Z d
   Z d   Z RS(   s`   A maximum sized pool of Comm objects.

    This provides a connect method that mirrors the normal distributed.connect
    method, but provides connection sharing and tracks connection limits.

    This object provides an ``rpc`` like interface::

        >>> rpc = ConnectionPool(limit=512)
        >>> scheduler = rpc('127.0.0.1:8786')
        >>> workers = [rpc(address) for address ...]

        >>> info = yield scheduler.identity()

    It creates enough comms to satisfy concurrent connections to any
    particular address::

        >>> a, b = yield [scheduler.who_has(), scheduler.has_what()]

    It reuses existing comms so that we don't have to continuously reconnect.

    It also maintains a comm limit to avoid "too many open file handle"
    issues.  Whenever this maximum is reached we clear out all idling comms.
    If that doesn't do the trick then we wait until one of the occupied comms
    closes.

    Parameters
    ----------
    limit: int
        The number of open comms to maintain at once
    deserialize: bool
        Whether or not to deserialize data by default or pass it through
    i   c         C` s   | |  _  t t  |  _ t t  |  _ | |  _ | |  _ | d  k	 rK | n | |  _ | |  _	 | |  _
 t   |  _ | r t j |  n d  |  _ t j   |  _ |  j j |   d  S(   N(   t   limitR   R   t	   availablet   occupiedRW   R   R3   R   R   R   R   t   eventR\   R5   t   serverR]   R   t
   _instancesRx   (   RC   R   RW   R   R   R   R   R   (    (    s/   lib/python2.7/site-packages/distributed/core.pyRn     s    
					c         C` s   t  t t |  j j     S(   N(   t   sumt   mapR   R   Ro   (   RC   (    (    s/   lib/python2.7/site-packages/distributed/core.pyR   7  s    c         C` s#   |  j  t t t |  j j     S(   N(   R   R   R   R   R   Ro   (   RC   (    (    s/   lib/python2.7/site-packages/distributed/core.pyR   ;  s    c         C` s   d |  j  |  j f S(   Ns$   <ConnectionPool: open=%d, active=%d>(   R   R   (   RC   (    (    s/   lib/python2.7/site-packages/distributed/core.pyR   ?  s    c         C` s:   t  d | d | d |  } t | |  d |  j d |  j S(   s    Cached rpc objects R   R   R   R   R   (   R   R   R   R   (   RC   R   R   R   (    (    s/   lib/python2.7/site-packages/distributed/core.pyt   __call__B  s    c         c` sN  |  j  | } |  j | } | rZ | j   } | j   sZ | j |  t j |   qZ n  x; |  j |  j k r |  j	 j
   |  j   |  j	 j   Vq] Wy] t | d | p |  j d |  j d |  j V} d | _ t j |   | _ |  j j |  Wn t k
 r  n X| j |  |  j |  j k r;|  j	 j
   n  t j |   d S(   sE   
        Get a Comm to the given address.  For internal use.
        R   RW   R   R   N(   R   R   R   R   Rx   R	   R   R   R   R   R   t   collectt   waitR   R   RW   R   R~   R\   R5   t   _poolR   R   (   RC   R   R   R   R   R   (    (    s/   lib/python2.7/site-packages/distributed/core.pyR   I  s4    
		c         C` su   y |  j  | j |  Wn t k
 r+ nF X| j   r] |  j |  j k  rq |  j j   qq n |  j | j	 |  d S(   sV   
        Reuse an open communication to the given address.  For internal use.
        N(
   R   t   removeR   R   R   R   R   R   R   Rx   (   RC   R   R   (    (    s/   lib/python2.7/site-packages/distributed/core.pyR   n  s    c         C` s   t  j d |  j |  j  xB |  j j   D]1 \ } } x | D] } | j   q< W| j   q) W|  j |  j k  r |  j	 j
   n  d S(   sV   
        Collect open but unused communications, to allow opening other ones.
        s.   Collecting unused comms.  open: %d, active: %dN(   Rv   Rw   R   R   R   R   R   R   R   R   R   (   RC   R   R   R   (    (    s/   lib/python2.7/site-packages/distributed/core.pyR   }  s    c         C` s   t  j d |  | |  j k rO |  j j |  } x | D] } | j   q8 Wn  | |  j k r |  j j |  } x | D] } | j   qw Wn  |  j |  j k  r |  j j	   n  d S(   s6   
        Remove all Comms to a given address.
        s   Removing comms to %sN(
   Rv   Rw   R   R   R   R   R   R   R   R   (   RC   R   R   R   (    (    s/   lib/python2.7/site-packages/distributed/core.pyR    s    c         C` s   x2 |  j  j   D]! } x | D] } | j   q Wq Wx2 |  j j   D]! } x | D] } | j   qR WqE Wx' |  j D] } t j   j | j  qt Wd S(   s4   
        Close all communications abruptly.
        N(   R   Ro   R   R   R   R
   R`   Ri   (   RC   R   R   (    (    s/   lib/python2.7/site-packages/distributed/core.pyR     s    N(   R   R    R   R\   R]   R   Rt   R3   Rn   R   R   R   R   R   R	   Rh   R   R   R   R  R   (    (    (    s/   lib/python2.7/site-packages/distributed/core.pyR     s&    	$			c         C` s.   t  |  t t f  r$ t |    }  n  t |   S(   N(   R{   R|   R   R   R   (   t   o(    (    s/   lib/python2.7/site-packages/distributed/core.pyR     s    R   c         C` s	  t    } t |  d  } y& t j j |  } t j j |  Wn# t k
 rc t t |   } n Xt j |  } y t j j |  } Wn- t k
 r d j	 t
 j |   } } n Xt |  d k r d } n t j |  } i | d 6| d 6| d 6t |  d 6S(	   s   Produce message to send back given an exception has occurred

    This does the following:

    1.  Gets the traceback
    2.  Truncates the exception and the traceback
    3.  Serializes the exception and traceback or
    4.  If they can't be serialized send string versions
    5.  Format a message and return

    See Also
    --------
    clean_exception: deserialize and unpack message into exception/traceback
    six.reraise: raise exception/traceback
    i  R.   i'  R   R   t	   tracebackR   N(   R   R   R   t   picklet   dumpst   loadsR   RO   t   to_serializet   joinR  t	   format_tbR   R3   (   R   R   t   tbt   e2t   e3t   e4t   tb2t	   tb_result(    (    s/   lib/python2.7/site-packages/distributed/core.pyR     s     	 	c         K` s   t  |  t  s t  |  t  rW y t j j |   }  Wqu t k
 rS t |   }  qu Xn t  |  t  ru t |   }  n  t  | t  r y t j j |  } Wq t t	 f k
 r d } q Xn t  | t  r d } n  t |   |  | f S(   s    Reraise exception and traceback. Deserialize if necessary

    See Also
    --------
    error_message: create and serialize errors into message
    N(   R{   t   bytest	   bytearrayR   R  R  R   RO   R   t   AttributeErrorR3   R   RK   (   R   R  R'   (    (    s/   lib/python2.7/site-packages/distributed/core.pyR     s    	(L   t
   __future__R    R   R   t   collectionsR   R   t   concurrent.futuresR   t	   functoolsR   t   loggingR   R  RP   R\   RH   R   t   toolzR   t   tornadoR	   t   tornado.ioloopR
   t   tornado.locksR   t   compatibilityR   R   R   R   R   R   R   R   t   metricsR   R.   R   t   system_monitorR   t   utilsR   R   R   R   R   R   R   R   t   IOErrorR   t	   getLoggerR   Rv   R%   R*   t   MAX_BUFFER_SIZERI   RJ   Ru   R   t   objectR-   R   Rh   Rt   R3   R   R   R   R   R   R   R   R   (    (    (    s/   lib/python2.7/site-packages/distributed/core.pyt   <module>   sR   .4					 	(
2	%