import basepair
import keras
import deepexplain
import comet_ml
from basepair.config import create_tf_session
from gin_train.metrics import ClassificationMetrics
import keras.backend as K
import tensorflow as tf
def id_fn(x):
return x
def named_tensor(x, name):
return kl.Lambda(id_fn, name=name)(x)
class BinaryClassificationHead:
def __init__(self, net,
loss='binary_crossentropy',
loss_weight=0.5,
metric=ClassificationMetrics(),
target='{task}/profile',
# bias input
use_bias=False,
bias_input='bias/{task}/profile',
bias_shape=(2,),
):
self.net = net
self.loss = loss
self.loss_weight = loss_weight
self.metric = metric
self.target = target
self.bias_input = bias_input
self.use_bias = use_bias
self.bias_shape = bias_shape
def neutral_bias_input(self, task, length):
"""Create dummy bias input
"""
return {self.get_bias_input(task): np.zeros((length, ) + self.bias_shape)}
def get_target(self, task):
return self.target.format(task=task)
def get_bias_input(self, task):
return self.bias_input.format(task=task)
def __call__(self, inp, task):
o = self.net(inp)
# remember the tensors useful for interpretation (referred by name)
self.pre_act = o.name
# add the target bias
if self.use_bias:
binp = kl.Input(self.bias_shape, name=self.get_bias_input(task))
bias_inputs = [binp]
# add the bias term
bias_x = kl.Dense(1)(binp)
o = kl.add([o, bias_x])
else:
bias_inputs = []
o = kl.Activation("sigmoid")(o)
self.post_act = o.name
# label the target op so that we can use a dictionary of targets
# to train the model
return named_tensor(o, name=self.get_target(task)), bias_inputs
def get_preact_tensor(self, graph=None):
if graph is None:
graph = tf.get_default_graph()
return graph.get_tensor_by_name(self.pre_act)
def intp_tensors(self, preact_only=False, graph=None):
"""Return the required interpretation tensors
"""
if graph is None:
graph = tf.get_default_graph()
if preact_only:
return {"logit": graph.get_tensor_by_name(self.pre_act)}
else:
return {"logit": graph.get_tensor_by_name(self.pre_act),
"p": graph.get_tensor_by_name(self.post_act)}
def get_intp_tensor(self, which='logit'):
return self.interpretation_tensors()[which]
def copy(self):
from copy import deepcopy
return deepcopy(self)
from keras.optimizers import Adam
from collections import OrderedDict, defaultdict
from copy import deepcopy
from keras.models import Model
from kipoi.data_utils import numpy_collate_concat
from basepair.data import numpy_minibatch, nested_numpy_minibatch
class SeqModel:
"""Model interpreting the genome sequence
"""
def __init__(self,
body,
# will be used for each task
heads,
tasks,
optimizer=Adam(lr=0.004),
):
"""
1. build the keras model (don't )
2. compile the Keras model
"""
self.body = body
self.tasks = tasks
inp = kl.Input(shape=(None, 4), name='seq')
bottleneck = body(inp)
self.bottleneck_name = bottleneck.name # remember the bottleneck tensor name
# create different heads
outputs = []
self.all_heads = defaultdict(list)
self.losses = []
self.loss_weights = []
self.target_names = []
bias_inputs = []
for task in tasks:
for head in heads:
head = head.copy()
self.all_heads[task].append(head)
out, bias_input = head(bottleneck, task)
outputs.append(out)
bias_inputs += bias_input
self.target_names.append(head.get_target(task))
self.losses.append(head.loss)
self.loss_weights.append(head.loss_weight)
# create and compile the model
self.model = Model([inp] + bias_inputs, outputs)
self.model.compile(optimizer=optimizer,
loss=self.losses, loss_weights=self.loss_weights)
# start without any importance function
self.imp_fns = {}
def _get_input_tensor(self):
return self.model.inputs[0]
def get_bottleneck_tensor(self, graph=None):
if graph is None:
graph = tf.get_default_graph()
return graph.get_tensor_by_name(self.bottleneck_name)
def bottleneck_model(self):
return Model(self._get_input_tensor(),
self.get_bottleneck_tensor())
def preact_model(self):
outputs = [head.get_preact_tensor()
for task, heads in self.all_heads.items()
for head in heads]
return Model(self._get_input_tensor(), outputs)
def predict_preact(self, seq, batch_size=256):
m = self.preact_model()
preds = m.predict(seq, batch_size=batch_size)
return {k:v for k,v in zip(self.target_names, preds)}
def get_intp_tensors(self, preact_only=True):
intp_targets = []
for task, heads in self.all_heads.items():
for head in heads:
for k,v in head.intp_tensors(preact_only=preact_only).items():
intp_targets.append((head.get_target(task) + "/" + k, v))
return intp_targets
def _imp_deeplift_fn(self, x, name, preact_only=True):
"""Deeplift importance score tensors
"""
k = f"deeplift/{name}"
if k in self.imp_fns:
return self.imp_fns[k]
import deepexplain
from deepexplain.tensorflow.methods import DeepLIFTRescale
from deepexplain.tensorflow import DeepExplain
from deeplift.dinuc_shuffle import dinuc_shuffle
from collections import OrderedDict
from keras.models import load_model, Model
import keras.backend as K
import numpy as np
import tempfile
self.imp_fns = {}
with tempfile.NamedTemporaryFile(suffix='.pkl') as temp:
self.model.save(temp.name)
K.clear_session()
self.model = load_model(temp.name)
# get the interpretation tensors
intp_names, intp_tensors = list(zip(*self.get_intp_tensors(preact_only)))
# input_tensor = self._get_input_tensor()
input_tensor = self.model.inputs
if isinstance(x, list):
x_subset = [ix[:1] for ix in x]
elif isinstance(x, dict):
x_subset = [v[:1] for k,v in x.items()]
else:
x_subset = x[:1]
with deepexplain.tensorflow.DeepExplain(session=K.get_session()) as de:
fModel = Model(inputs=input_tensor, outputs=intp_tensors)
target_tensors = fModel(input_tensor)
for name, target_tensor in zip(intp_names, target_tensors):
# input_tensor = fModel.inputs[0]
print(name)
print(target_tensor)
self.imp_fns["deeplift/" + name] = de.explain('deeplift',
target_tensor,
input_tensor,
x_subset)
return self.imp_fns[k]
def imp_score(self, x, name, method='deeplift', batch_size=512, preact_only=True):
"""Compute the importance score
Args:
x: one-hot encoded DNA sequence
method: which importance score to use. Available: grad, ism, deeplift
name: which interepretation method to compute
"""
if method == "deeplift":
fn = self._imp_deeplift_fn(x, name, preact_only=preact_only)
else:
raise ValueError("Please provide a valid importance scoring method: grad, ism or deeplift")
def input_to_list(input_names, x):
if isinstance(x, list):
return x
elif isinstance(x, dict):
return [x[k] for k in input_names]
else:
return [x]
input_names = self.model.input_names
assert input_names[0] == "seq"
if batch_size is None:
return fn(input_to_list(input_names, x))[0]
else:
return numpy_collate_concat([fn(input_to_list(input_names, batch))[0]
for batch in nested_numpy_minibatch(x, batch_size=batch_size)])
def imp_score_all(self, seq, method='deeplift', batch_size=512, preact_only=True):
"""Compute all importance scores
Args:
seq: one-hot encoded DNA sequences
method: 'deeplift'
aggregate_strands: if True, the average importance scores across strands will be returned
batch_size: batch size when computing the importance scores
Returns:
dictionary with keys: {task}/{head}/{interpretation_tensor}
and values with the same shape as `seq` corresponding to importance scores
"""
return {name: self.imp_score(seq, name, method=method, batch_size=batch_size)
for name, _ in self.get_intp_tensors(preact_only=preact_only)}
def predict(self, seq, batch_size=256):
"""Convert to dictionary
"""
preds = self.model.predict(seq, batch_size=batch_size)
return {k:v for k,v in zip(self.target_names, preds)}
def save(self, file_path):
"""Save model to a file
"""
from basepair.utils import write_pkl
write_pkl(self, file_path)
@classmethod
def load(cls, file_path):
"""Load model from a file
"""
from basepair.utils import read_pkl
return read_pkl(file_path)
@classmethod
def from_mdir(cls, model_dir):
"""Load the model from pkl
"""
return cls.load(os.path.join(model_dir, 'seq_model.pkl'))
# K.reset_uids()
# K.clear_session()
# sess = K.get_session()
from basepair.utils import write_pkl
from keras.models import Model
from keras.models import Sequential
import keras.layers as kl
# some example layers
class TopDense:
"""Class to be used as functional model interpretation
"""
def __init__(self, pool_size=2):
self.pool_size = pool_size
def __call__(self, inp):
x = kl.GlobalAvgPool1D()(inp)
return kl.Dense(1)(x)
class BaseNet:
"""Class to be used as functional model interpretation
"""
def __init__(self, activation='relu'):
self.activation = activation
def __call__(self, inp):
x = kl.Conv1D(16, kernel_size=3, activation=self.activation)(inp)
return x
import numpy as np
from concise.preprocessing import encodeDNA
# test the model
seqs = encodeDNA(['ACAGA']*100)
inputs = {"seq": seqs,
"bias/a/profile": np.random.randint(low=0, high=2, size=(100, 2)).astype(bool),
"bias/b/profile": np.random.randint(low=0, high=2, size=(100, 2)).astype(bool)}
targets = {"a/profile": np.random.randint(low=0, high=2, size=(100, 1)).astype(bool),
"b/profile": np.random.randint(low=0, high=2, size=(100, 1)).astype(bool),
}
m = SeqModel(
body=BaseNet('relu'),
heads=[BinaryClassificationHead(net=TopDense(pool_size=2), use_bias=True)],
tasks=['a', 'b']
)
m.imp_fns
isinstance(m.model.inputs, list)
m.imp_score_all(list(inputs.values()), preact_only=True, method='deeplift')['a/profile/logit'][0]
fn = m.imp_fns['deeplift/a/profile/logit']
fn??
list(inputs)
[x.shape for x in inputs.values()]
%debug
m.imp_fns['deeplift/a/profile/logit'](list(inputs.values()))
%debug
m.get_intp_tensors()
m.predict_preact(seqs)['a/profile'][:4]
# TODO - can we force some tensors to be 0?
m.all_heads['a'][0].neutral_bias_input('a', 10)
m.model.inputs
m.bottleneck_model().output
m.model.fit(inputs, targets)
m2 = SeqModel(
body=m.bottleneck_model(),
heads=[BinaryClassificationHead(net=TopDense(pool_size=2))],
tasks=['a', 'b']
)
m2.model.inputs
m.model.inputs
m2.body.inputs
m2._get_input_tensor()
m2.model.fit(seqs, targets)
m = SeqModel.load("/tmp/m.pkl")
m.model.save("/tmp/m.pkl")
from keras.models import load_model
inp = kl.Input((None, 4), name='seq')
m.predict(encodeDNA(['ACAGA']))
m.get_intp_tensors()
m._imp_deeplift_fn
m.model.fit(seqs, targets)
K.clear_session()
g = tf.get_default_graph()
g.get_operations()
g = tf.get_default_graph()
g.get_operations()
m = SeqModel.load("/tmp/a.pkl")
t = m.all_heads['a'][0]
t.get_interpretation_tensor('logit')
interpret_targets
m.grad_fns = {}
fns = _imp_deeplift_fn(m, seqs)
fns['b/profile/logit']([seqs])[0]
m
g = tf.get_default_graph()
g.get_operations()
from basepair.config import create_tf_session
create_tf_session(0)
"""Test sequence model
"""
from basepair.seqmodel import SeqModel
from basepair.heads import ScalarHead, BinaryClassificationHead, ProfileHead
import numpy as np
import keras.layers as kl
class TopDense:
"""Class to be used as functional model interpretation
"""
def __init__(self, pool_size=2):
self.pool_size = pool_size
def __call__(self, inp):
x = kl.GlobalAvgPool1D()(inp)
return kl.Dense(1)(x)
class TopConv:
"""Class to be used as functional model interpretation
"""
def __init__(self, n_output=2):
self.n_output = n_output
def __call__(self, inp):
return kl.Conv1D(self.n_output, 1)(inp)
class BaseNet:
"""Class to be used as functional model interpretation
"""
def __init__(self, activation='relu'):
self.activation = activation
def __call__(self, inp):
x = kl.Conv1D(16, kernel_size=3, activation=self.activation, padding='same')(inp)
return x
def test_interpret_wo_bias():
from basepair.metrics import PeakPredictionProfileMetric
from gin_train.metrics import RegressionMetrics, ClassificationMetrics
from concise.preprocessing import encodeDNA
# test the model
seqs = encodeDNA(['ACAGA'] * 100)
inputs = {"seq": seqs,
"bias/a/profile": np.random.randn(100, 5, 2)}
# Let's use regression
targets = {"a/class": np.random.randint(low=0, high=2, size=(100, 1)).astype(float),
"a/counts": np.random.randn(100),
"a/profile": np.random.randn(100, 5, 2),
}
import keras.backend as K
# K.clear_session()
# use bias
m = SeqModel(
body=BaseNet('relu'),
heads=[BinaryClassificationHead('{task}/class',
net=TopDense(pool_size=2),
use_bias=False),
ScalarHead('{task}/counts',
loss='mse',
metric=RegressionMetrics(),
net=TopDense(pool_size=2),
use_bias=False),
ProfileHead('{task}/profile',
loss='mse',
metric=PeakPredictionProfileMetric(),
net=TopConv(n_output=2),
use_bias=True,
bias_shape=(5, 2)), # NOTE: the shape currently has to be hard-coded to the sequence length
],
tasks=['a']
)
m.model.fit(inputs, targets)
o = m.imp_score_all(seqs)
assert 'a/profile/wn' in o
assert o['a/profile/wn'].shape == seqs.shape
assert 'a/profile/wn' in o
assert o['a/profile/wn'].shape == seqs.shape
# evaluate the dataset -> setup an array dataset (NumpyDataset) -> convert to
from basepair.data import NumpyDataset
ds = NumpyDataset({"inputs": inputs, "targets": targets})
o = m.evaluate(ds)
assert 'avg/counts/mad' in o
from basepair.metrics import PeakPredictionProfileMetric
from gin_train.metrics import RegressionMetrics, ClassificationMetrics
from concise.preprocessing import encodeDNA
# test the model
seqs = encodeDNA(['ACAGA'] * 100)
inputs = {"seq": seqs,
"bias/a/profile": np.random.randn(100, 5, 2)}
# Let's use regression
targets = {"a/class": np.random.randint(low=0, high=2, size=(100, 1)).astype(float),
"a/counts": np.random.randn(100),
"a/profile": np.random.randn(100, 5, 2),
}
import keras.backend as K
# K.clear_session()
# use bias
m = SeqModel(
body=BaseNet('relu'),
heads=[BinaryClassificationHead('{task}/class',
net=TopDense(pool_size=2),
use_bias=False),
ScalarHead('{task}/counts',
loss='mse',
metric=RegressionMetrics(),
net=TopDense(pool_size=2),
use_bias=False),
ProfileHead('{task}/profile',
loss='mse',
metric=PeakPredictionProfileMetric(),
net=TopConv(n_output=2),
use_bias=True,
bias_shape=(5, 2)), # NOTE: the shape currently has to be hard-coded to the sequence length
],
tasks=['a']
)
m.model.fit(inputs, targets)
type(m)
gin_file = '../train/seqmodel/example.gin'
import gin
import gin_train
from gin_train.cli.gin_train import train
gin.parse_config_files_and_bindings([gin_file], bindings='')
m = SeqModel()
m.save("/tmp/a.pkl")
o = m.imp_score_all(seqs)
assert 'a/profile/wn' in o
assert o['a/profile/wn'].shape == seqs.shape
assert 'a/profile/wn' in o
assert o['a/profile/wn'].shape == seqs.shape
# evaluate the dataset -> setup an array dataset (NumpyDataset) -> convert to
from basepair.data import NumpyDataset
ds = NumpyDataset({"inputs": inputs, "targets": targets})
o = m.evaluate(ds)
assert 'avg/counts/mad' in o