B
    T\                 @   sF  d dl mZmZmZ d dlZd dlZd dlZd dlZd dlm	Z	 d dl
Z
ddlmZ ddlmZ ddlmZ ddlmZmZ d	d
 Zeeeje dd Ze
jZdd ZG dd deZe Zdd Z y*d dl!Z"e"j#$  d dl%m&Z& dd Z'W n& e(k
r   dd Z'dd Z&Y nX dd Z)dZ*dd Z+d#dd Z,d!d" Z-dS )$    )absolute_importdivisionprint_functionN)warn   )config)copyreg)	get_async)fusecullc             C   s   t | j| jffS )N)getattr__objclass____name__)m r   3lib/python3.7/site-packages/dask/multiprocessing.py_reduce_method_descriptor   s    r   c             C   s   t j| tjdS )N)Zprotocol)cloudpickledumpspickleZHIGHEST_PROTOCOL)xr   r   r   _dumps   s    r   c               C   s
   t  jS )N)multiprocessingZcurrent_processZidentr   r   r   r   _process_get_id    s    r   c               @   s0   e Zd ZdZdd Zdd Zdd Zdd	 Zd
S )RemoteExceptionzY Remote Exception

    Contains the exception and traceback from a remotely run task
    c             C   s   || _ || _d S )N)	exception	traceback)selfr   r   r   r   r   __init__6   s    zRemoteException.__init__c             C   s   t | jd | j S )Nz

Traceback
---------
)strr   r   )r   r   r   r   __str__:   s    zRemoteException.__str__c             C   s(   t ttt| t| j t| j S )N)sortedsetdirtypelist__dict__r   )r   r   r   r   __dir__@   s    zRemoteException.__dir__c             C   s.   yt | |S  tk
r(   t| j|S X d S )N)object__getattribute__AttributeErrorr   r   )r   keyr   r   r   __getattr__E   s    zRemoteException.__getattr__N)r   
__module____qualname____doc__r   r    r'   r,   r   r   r   r   r   1   s
   r   c             C   sr   t | tkr"tt |  }|| |S y6t | jjtt | fdt | i}|tt | < || |S  tk
rl   | S X dS )z8 Metaclass that wraps exception type in RemoteException Zexception_typeN)r$   
exceptions	__class__r   r   	TypeError)exctbtypr   r   r   remote_exceptionO   s    


r6   )reraisec             C   s   | S )Nr   )r4   r   r   r   _pack_tracebackd   s    r8   c             C   s   d t| S )N )joinr   	format_tb)r4   r   r   r   r8   h   s    c             C   s   t | |} | d S )N)r6   )r3   r4   r   r   r   r7   k   s    
r7   c          
   C   sp   t  \}}}t|}y|| |f}W nD tk
rj }  z&t  \}}}t|}|| |f}W d d } ~ X Y nX |S )N)sysexc_infor8   BaseException)er   exc_type	exc_valueexc_tracebackr4   resultr   r   r   pack_exceptionp   s    rD   zThe 'multiprocessing.context' configuration option will be ignored on Python 2
and on Windows, because they each only support a single context.
c              C   sJ   t jdkst jjdkr4tdddk	r0ttt t	S tdd} t	
| S )z, Return the current multiprocessing context.Zwin32   zmultiprocessing.contextN)r<   platformversion_infomajorr   getr   _CONTEXT_UNSUPPORTEDUserWarningr   get_context)Zcontext_namer   r   r   rL      s    
rL   Tc             K   s   |pt dd}|pt dd}|dkrBt }|j|td}d}	nd}	t| |\}
}|rjt|
||\}}n|
}|pt ddpt}|pt ddpt}z.t	|j
t|j||ft||ttd	|}W d|	r|  X |S )
a_   Multiprocessed get function appropriate for Bags

    Parameters
    ----------
    dsk : dict
        dask graph
    keys : object or list
        Desired results from graph
    num_workers : int
        Number of worker processes (defaults to number of cores)
    func_dumps : function
        Function to use for function serialization
        (defaults to cloudpickle.dumps)
    func_loads : function
        Function to use for function deserialization
        (defaults to cloudpickle.loads)
    optimize_graph : bool
        If True [default], `fuse` is applied to the graph before computation.
    poolNnum_workers)ZinitializerTF
func_loads
func_dumps)Zget_idr   loadsrD   Zraise_exception)r   rI   rL   ZPoolinitialize_worker_processr   r
   _loadsr   r	   Zapply_asynclenZ_poolr   rD   r7   close)ZdskkeysrN   rO   rP   Zoptimize_graphrM   kwargscontextZcleanupZdsk2ZdependenciesZdsk3rQ   r   rC   r   r   r   rI      s,    
rI   c              C   s"   t jd} | dk	r| j  dS )zE
    Initialize a worker process before running any tasks in it.
    ZnumpyN)r<   modulesrI   ZrandomZseed)Znpr   r   r   rR      s    rR   )NNNTN).Z
__future__r   r   r   r   r   r   r<   warningsr   r   r9   r   Zcompatibilityr   Zlocalr	   optimizationr
   r   r   r$   r"   unionr   rQ   rS   r   	Exceptionr   dictr0   r6   Ztblib.pickling_supportZtblibZpickling_supportZinstallZdask.compatibilityr7   r8   ImportErrorrD   rJ   rL   rI   rR   r   r   r   r   <module>   s@   
 
9