ó
ĶÕ\c           @` sō   d  Z  d d l m Z m Z m Z d d l Z d d l m Z d d l m	 Z	 d d l
 Z
 d d l
 m Z m Z d d l m Z d d	 l m Z d d
 l m Z m Z d   Z e   Z d a e e  Z e   Z d   Z d d d d  Z d S(   s2   
A threaded shared-memory scheduler

See local.py
i    (   t   absolute_importt   divisiont   print_functionN(   t   defaultdict(   t
   ThreadPool(   t   current_threadt   Locki   (   t   config(   t	   get_async(   t   inct   addc           C` s
   t    j S(   N(   R   t   ident(    (    (    s,   lib/python2.7/site-packages/dask/threaded.pyt   _thread_get_id   s    c         C` s   |  t  j   d f S(   Ni   (   t   syst   exc_info(   t   et   dumps(    (    s,   lib/python2.7/site-packages/dask/threaded.pyt   pack_exception   s    c         K` s  | p t  j d d  } | p- t  j d d  } t   } t  | d k rÏ | d k r | t k r t d k r| t   a n  t } qÏ | t k rē | t | k rē t | | } qÏ t |  } | t | | <n  Wd QXt	 | j
 t | j  |  | d | d t d t | } t v t t j    } | t k	 rxP t t  D]? }	 |	 | k r=x* t j |	  j   D] }
 |
 j   qeWq=q=Wn  Wd QX| S(   sE   Threaded cached implementation of dask.get

    Parameters
    ----------

    dsk: dict
        A dask dictionary specifying a workflow
    result: key or list of keys
        Keys corresponding to desired data
    num_workers: integer of thread count
        The number of threads to use in the ThreadPool that will actually execute tasks
    cache: dict-like (optional)
        Temporary storage of results

    Examples
    --------

    >>> dsk = {'x': 1, 'y': 2, 'z': (inc, 'x'), 'w': (add, 'z', 'y')}
    >>> get(dsk, 'w')
    4
    >>> get(dsk, ['w', 'y'])
    (4, 2)
    t   poolt   num_workersNt   cachet   get_idR   (   R   t   gett   NoneR   t
   pools_lockt   main_threadt   default_poolR   t   poolsR   t   apply_asynct   lent   _poolR   R   t   sett	   threadingt	   enumeratet   listt   popt   valuest   close(   t   dskt   resultR   R   R   t   kwargst   threadt   resultst   active_threadst   tt   p(    (    s,   lib/python2.7/site-packages/dask/threaded.pyR   !   s0    		(   t   __doc__t
   __future__R    R   R   R   t   collectionsR   t   multiprocessing.poolR   R    R   R   t    R   t   localR   t
   utils_testR	   R
   R   R   R   R   t   dictR   R   R   R   (    (    (    s,   lib/python2.7/site-packages/dask/threaded.pyt   <module>   s    				