o
    UݢgP/                     @  sD  d Z ddlmZ ddlZddlZddlmZmZmZm	Z	 ddl
mZmZmZ ddlZddlmZ ddlmZ ddlmZ d]ddZd^d_ddZd`ddZedaddZedbddZdcddZddd$d%Zded*d+Z	,dfdgd1d2Zdhd5d6Zdidjd:d;Z dkd?d@Z!dldFdGZ"dmdIdJZ#dndNdOZ$dodSdTZ%	d^dpdWdXZ&dqd[d\Z'dS )rzAssorted grab-bag of miscellaneous helper methods.

Do not add to this module.  Instead, find or create a module with a name
that indicates to a potential user what sorts of methods they might find
in that module.
    )annotationsN)
Collection	GeneratorIterableSequence)IOAnyStroverload)FilteredBarcodesbarcodebytes | Nonereturn
int | Nonec                 C  s6   | du rdS |  d}|dkrt| |d d S dS )zMethod to get just the gem group from a barcode.

    Args:
        barcode (bytes): a barcode, a la "ACGTACTAGAC-1"

    Returns:
        The Gem group as an int or None if not present.
    N   -r      )indexintr   	gem_group r   _/oak/stanford/groups/akundaje/marinovg/programs/cellranger-9.0.1/lib/python/cellranger/utils.pyget_gem_group_from_barcode   s   	
r   bytesr   c                 C  s   |d ur
| d| 7 } | S )Ns   -%dr   r   r   r   r   format_barcode_seq,   s   r   barcode_seqsCollection[bytes]
gem_groupsIterable[int] | Nonelist[bytes]c                   s&   |du r S  fddt t|D S )zFormat a sequence of barcodes as seqs.

    Args:
        barcode_seqs (list[bytes]): _description_
        gem_groups (Optional[Iterable[int]]): _description_

    Returns:
        list[bytes]: _description_
    Nc                   s    g | ]} D ]}t ||qqS r   )r   ).0Zggbcr   r   r   
<listcomp>@   s     z'format_barcode_seqs.<locals>.<listcomp>)sortedset)r   r   r   r!   r   format_barcode_seqs2   s   r%   Nonetuple[None, None]c                 C     d S Nr   r   r   r   r   split_barcode_seqC      r+   tuple[bytes, int | None]c                 C  r(   r)   r   r*   r   r   r   r+   I   r,   c                 C  s\   | du rdS t | tsJ | jddd}|d } t|dkr(t|d }| |fS d}| |fS )zSplit a barcode-gem_group.

    Args:
        barcode (bytes): A barcode with an optional gem group suffix.

    Returns:
        _type_: _description_
    N)NNr      )maxsplitr   r   )
isinstancer   splitlenr   )r   Zbarcode_partsr   r   r   r   r+   O   s   	bcsIterable[bytes]	gg_id_mapdict[int, str | bytes]list[str | bytes]c                   s    fdd| D }|S )a  Turn list of barcodes into corresponding list of mapped values.

    Args:
        bcs (list): aggr barcodes with suffix corresponding to gem group id
        gg_id_map (dict): mapping gem-group-ids/barcode-suffixes (int) to
            a desired named value (typically `aggr_id` for the library id of that
            one sample, or `batch_id` for the name of a batch it is a part of.)
    c                   s   g | ]
} t |d   qS )r   )r+   )r   r    r5   r   r   r"   r   s    z)bcs_suffices_to_names.<locals>.<listcomp>r   )r3   r5   Z
mapped_idsr   r8   r   bcs_suffices_to_namesg   s   r9   searchAnyStr | NonegenomesSequence[AnyStr]c                 C  sZ   t |dksJ | du rdS t |dkr|d S |D ]}| |r%|  S qt|  d)zReturn the first genome which is a prefix for the given search string.

    Args:
        search: _description_
        genomes: _description_

    Raises:
        Exception: _description_

    Returns:
        _type_: _description_
    r   Nr   & does not have valid associated genome)r2   
startswith
ValueError)r:   r<   genomer   r   r   get_genome_from_strv   s   
rB   Fseqr   prefixes_as_genomesboolc                 C  s   t |dksJ | du rdS t |dkr| S tdd |D }|D ]}| |r@|r6| dt | d   S | d| d   S q!t|  d)a  _summary_.

    Args:
        seq (_type_): _description_
        genomes (_type_): _description_
        prefixes_as_genomes (bool, optional): _description_. Defaults to False.

    Raises:
        ValueError: _description_

    Returns:
        _type_: _description_
    r   Nr   c                 s  s    | ]}t |V  qd S r)   )r2   )r   gr   r   r   	<genexpr>       z)remove_genome_from_str.<locals>.<genexpr>r>   )r2   maxr?   r@   )rC   r<   rD   max_lenrA   r   r   r   remove_genome_from_str   s   
rK   readtGenerator[tuple[bytes | None, bytes, int, bytes | None] | tuple[bytes | None, None, None, bytes | None], None, None]c                 c  s    | sdS |  dD ]V}t|dkrq
| d}t|dkrq
|d r'|d nd}|d r@|d dd }t|d dd }nd}d}|d rL|d nd}|tjv sY|du sYJ ||||fV  q
dS )a  Iterate over all transcripts compatible with the given read.

    We do this by iterating over the TX tag entries that are of the form
    `TX:<transcript id>,<strand><position>,<cigar>`.

    Note:
        When intronic alignment is turned on, the `TX` tag is set to
        `TX:<gene id>,<strand>` for intronic  reads or exonic reads that are
        not compatible with annotated splice junctions. We ignore these.
    N   ;r      ,   r   r.   )r1   r2   r   cr_constantsSTRANDS)rL   xpartschromstrandposZcigarstringr   r   r   get_read_transcripts_iter   s&   
rX   @   bitsr   c                 C  sb   t | |d d ksJ d}tt | D ]}| ||d  }|tjv s%J |d> tj| B }q|S )zPack a DNA sequence (no Ns!) into a 2-bit format, into a python int.

    Args:
        seq (str): A DNA sequence.
        bits: the bit size of the integer to pack into.

    Returns:
        int: The sequence packed into the bits of an integer.
    r.   r   r   )r2   rangetk_seqNUCS_INVERSE)rC   rZ   resultinucr   r   r   compress_seq   s   
ra   in_filenames#Iterable[str | bytes | os.PathLike]dictc              	   C  sn   i }| D ]0}|du rqz t |}t|}|| W d   n1 s%w   Y  W q ty4   Y qw |S )zAMerge a list of json files and return the result as a dictionary.N)openjsonloadupdateOSError)rb   r^   filenamefdatar   r   r   merge_jsons_as_dict   s   

rm   library_prefixstrrA   region	read_typec                 C  s$   | |||g}| d d|}|S )a#  Formats the barcode summary into a key to access the `barcode_summary.h5` outs.

    Here, we need to accommodate both accessing old and new versions of the h5 file
    compatible with both v2 and v3 of the matrix.

    Args:
        library_prefix: Name of the library, used as a prefix
        genome: Name of the genome used for alignment
        region: Name of the subset of the genome we are looking at (e.g. transcriptome, regulome,
            epigenome, ...). This should be a controlled vocabulary in `cellranger.constants`
        read_type: Name of the read types we are trying to extract (e.g. `conf_mapped_deduped_barcoded`, ...).
            It should be also controlled in `cellranger.constants`.

    Returns:
        output_key: A string constant with the suffix `reads` appended.
    Zreads_)appendjoin)rn   rA   rp   rq   Zstr_listZ
output_keyr   r   r   format_barcode_summary_h5_key  s   

ru   *Generator[tuple[tuple, tuple], None, None]c                 #  s    t | dkr	dS t | d dkrdS |D ]}t |t |d ks#J q| D ]}t |t |d ks4J q&tjtdd |D }t|}t|d}t |d |d< t||D ]\ tfdd|D t fdd| D fV  qZdS )a  Group a collection of numpy arrays by key arrays.

    Yields `(key_tuple, view_tuple)` where `key_tuple` is the key grouped
    on and `view_tuple` is a tuple of views into the value arrays.

    Args:
        values (tuple of arrays): tuple of arrays to group.
        keys (tuple): tuple of sorted, numeric arrays to group by.

    Returns:
        sequence of tuple: Sequence of (`key_tuple`, `view_tuple`).
    r   Nc                 s  s*    | ]}t d gt |fdkV  qdS )r   r   N)npconcatenatediffr   keyr   r   r   rG   =  s   ( z numpy_groupby.<locals>.<genexpr>c                 3  s    | ]}|  V  qd S r)   r   rz   )group_startr   r   rG   D  rH   c                 3  s    | ]	}|  V  qd S r)   r   )r   valueZ	group_endr}   r   r   rG   D  s    
)r2   rw   
logical_orreducetupleflatnonzerorollzip)valueskeysZ	key_arrayZvalue_arrayZkey_change_indicesZgroup_startsZ
group_endsr   r   r   numpy_groupby#  s(   
"r   rk   	IO[bytes]*Generator[tuple[bytes, bytes], None, None]c                 c  sd    d}d}| D ]}|  }|dr"|r||fV  |dd }d}q||7 }q|r0||fV  dS dS )zIterate through sequences in a fasta file.

    Args:
        f (IO[bytes]): The input file object.

    Yields:
        tuple[bytes, bytes]: _description_
           >r   N)stripr?   )rk   ZhdrrC   liner   r   r   get_fasta_iterI  s   	


r   barcode_csvstr | bytes | os.PathLikedict[bytes, list[bytes]]c                 C  s,   t | tr	|  } dd t|   D S )z$Load a csv file of (genome,barcode).c                 S  s   i | ]	\}}|  |qS r   )encode)r   rA   barcodesr   r   r   
<dictcomp>e  s    z$load_barcode_csv.<locals>.<dictcomp>)r0   r   decoder
   Zper_genome_barcodesitems)r   r   r   r   load_barcode_csva  s
   
r   barcode_csv_filename
set[bytes]c                 C  sP   t |tr	| }t| }t }| D ]\}}|du s ||kr%|| q|S )a  Get set of cell-associated barcode strings.

    Args:
      barcode_csv_filename: TODO
      genome (bytes): Only get cell-assoc barcodes for this genome. If None, disregard genome.

    Returns:
      set of bytes: Cell-associated barcode strings (seq and gem-group).
    N)r0   ro   r   r   r$   r   rh   )r   rA   Zcell_bcs_per_genomeZcell_bcsrF   r3   r   r   r   get_cell_associated_barcode_setk  s   

r   input_stringbytes | strc                 C  sh   t | trz| jdd} W n
 ty   Y dS w t | tr2z
| jddd W dS  ty1   Y dS w dS )zReturns true if the string can be encoded as ascii.

    Input strings are often stored as ascii in numpy arrays, and we need
    to check that this conversion works.
    strict)errorsFasciiT)r0   r   r   UnicodeDecodeErrorro   r   UnicodeEncodeError)r   r   r   r   string_is_ascii  s   

r   )r   r   r   r   r)   )r   r   r   r   r   r   )r   r   r   r   r   r   )r   r&   r   r'   )r   r   r   r-   )r   r   )r3   r4   r5   r6   r   r7   )r:   r;   r<   r=   r   r;   )F)rC   r   r<   r=   rD   rE   r   r;   )rL   r   r   rM   )rY   )rC   r   rZ   r   r   r   )rb   rc   r   rd   )
rn   ro   rA   ro   rp   ro   rq   ro   r   ro   )r   rv   )rk   r   r   r   )r   r   r   r   )r   r   rA   r   r   r   )r   r   r   rE   )(__doc__
__future__r   rf   oscollections.abcr   r   r   r   typingr   r   r	   numpyrw   cellranger.constants	constantsrQ   
tenkit.seqrC   r\   cellranger.fast_utilsr
   r   r   r%   r+   r9   rB   rK   rX   ra   rm   ru   r   r   r   r   r   r   r   r   r   <module>   s@   





%-



&
