import basepair
from keras.models import Model, load_model
from basepair.losses import twochannel_multinomial_nll
import os
os.environ["CUDA_VISIBLE_DEVICES"] = "1"
model = load_model("model.h5", custom_objects={"twochannel_multinomial_nll": twochannel_multinomial_nll})
from basepair.utils import read_pkl
train,valid,test = read_pkl("/users/avsec/workspace/basepair-workflow/models/0/data.pkl")
preproc = read_pkl("/users/avsec/workspace/basepair-workflow/models/0/preprocessor.pkl")
from basepair.eval import evaluate
evaluate(model, valid[0], valid[1])
task_names = ["profile/Oct4", "profile/Sox2", "profile/Klf4", "profile/Nanog",
"counts/Oct4", "counts/Sox2", "counts/Klf4", "counts/Nanog"]
onehot_data = valid[0]
preds = model.predict(onehot_data)
to_interpret = onehot_data[[0,1,2]]
import keras.backend as K
inp = model.inputs[0]
fn_pos = {}
fn_neg = {}
for task_id, task_name in enumerate(task_names):
if "counts" in task_name:
fn_pos[task_name] = K.function([inp], K.gradients(model.outputs[task_id][:, 0], inp))
fn_neg[task_name] = K.function([inp], K.gradients(model.outputs[task_id][:, 1], inp))
else:
fn_pos[task_name] = K.function([inp], K.gradients(K.sum(K.stop_gradient(K.softmax(
model.outputs[task_id][:, :, 0])) * model.outputs[task_id][:, :, 0], axis=-1), inp))
fn_neg[task_name] = K.function([inp], K.gradients(K.sum(K.stop_gradient(K.softmax(
model.outputs[task_id][:, :, 1])) * model.outputs[task_id][:, :, 1], axis=-1), inp))
import numpy as np
from basepair.data import numpy_minibatch
grads_pos = {}
grads_neg = {}
for task_name in task_names:
grads_pos[task_name] = np.concatenate([np.array(fn_pos[task_name]([batch])).squeeze()
for batch in numpy_minibatch(valid[0], 512)])
grads_neg[task_name] = np.concatenate([np.array(fn_neg[task_name]([batch])).squeeze()
for batch in numpy_minibatch(valid[0], 512)])
# Setup different scores
hyp_scores = {}
scores = {}
for task_name in task_names:
hyp_scores[task_name] = grads_pos[task_name] + grads_neg[task_name]
hyp_scores[task_name] = hyp_scores[task_name] - hyp_scores[task_name].mean(-1, keepdims=True)
scores[task_name] = hyp_scores[task_name] * valid[0]
def deeplift_score(path_to_model, onehot_data, task_names, gpu=None):
if gpu is not None:
os.environ['CUDA_VISIBLE_DEVICES'] = gpu
else:
# Don't use any GPU's
os.environ['CUDA_VISIBLE_DEVICES'] = ''
import deepexplain
from deepexplain.tensorflow.methods import DeepLIFTRescale
from deepexplain.tensorflow import DeepExplain
from deeplift.dinuc_shuffle import dinuc_shuffle
from collections import OrderedDict
from keras.models import load_model
import keras.backend as K
import numpy as np
attributions = OrderedDict()
K.clear_session()
keras_model = load_model(path_to_model, custom_objects={"twochannel_multinomial_nll": twochannel_multinomial_nll})
with deepexplain.tensorflow.DeepExplain(session=K.get_session()) as de:
input_tensor = keras_model.layers[0].input
for task_idx, task in enumerate(task_names):
fModel = Model(inputs=input_tensor, outputs = keras_model.layers[task_idx-len(task_names)].output)
target_tensor = fModel(input_tensor)
attributions[task] = de.explain('deeplift', target_tensor, input_tensor, onehot_data)
return attributions
dl_hyp_scores = deeplift_score("model.h5", to_interpret, task_names, gpu="1")
dl_scores = {}
for task_name in task_names:
dl_scores[task_name] = np.sum(dl_hyp_scores[task_name], axis = -1, keepdims=True) * to_interpret
from concise.utils.plot import seqlogo_fig, seqlogo
import matplotlib.pyplot as plt
for task_idx, task in enumerate(task_names):
fig, (ax0, ax1)= plt.subplots(2, 1, sharex=True, figsize=(20, 6))
ax0.set_title(task+" dl scores")
seqlogo(dl_scores[task][0], ax=ax0)
ax1.set_title(task+" dl hyp_scores")
seqlogo(dl_hyp_scores[task][0], ax=ax1)
def ism_score(path_to_model, onehot_data, task_names):
from collections import OrderedDict
attribs = OrderedDict()
K.clear_session()
keras_model = load_model(path_to_model, custom_objects={"twochannel_multinomial_nll": twochannel_multinomial_nll})
for task_idx, task in enumerate(task_names):
attribs[task] = []
for sample in onehot_data:
temp_attribs = []
for pos in range(len(sample)):
temp_attribs.append([])
for base in range(4):
mutated = sample.copy()
mutated[pos] = np.zeros((1,4))
mutated[pos][base] = 1
temp_attribs[pos].append(np.sum(np.abs((keras_model.predict([[mutated]])[task_idx]))))
temp_attribs = np.array(temp_attribs)
avg_scores = np.mean(temp_attribs, axis=1, keepdims=True)
temp_attribs -= avg_scores
attribs[task].append(temp_attribs)
return attribs
to_interpret.shape
hyp_ism_scores = ism_score("model.h5", to_interpret, task_names)
ism_scores = {}
for task_name in task_names:
ism_scores[task_name] = np.sum(hyp_ism_scores[task_name], axis = -1, keepdims=True) * to_interpret
for task_idx, task in enumerate(task_names):
fig, (ax0, ax1)= plt.subplots(2, 1, sharex=True, figsize=(20, 6))
ax0.set_title(task+" ism scores")
seqlogo(ism_scores[task][0], ax=ax0)
ax1.set_title(task+" ism hyp_scores")
seqlogo(hyp_ism_scores[task][0], ax=ax1)