import basepair
from keras.models import Model, load_model
from basepair.losses import twochannel_multinomial_nll
import os
os.environ["CUDA_VISIBLE_DEVICES"] = "2,3"
model = load_model("model.h5", custom_objects={"twochannel_multinomial_nll": twochannel_multinomial_nll})
from basepair.utils import read_pkl
train,valid,test = read_pkl("/users/avsec/workspace/basepair-workflow/models/0/data.pkl")
preproc = read_pkl("/users/avsec/workspace/basepair-workflow/models/0/preprocessor.pkl")
from basepair.eval import evaluate
evaluate(model, valid[0], valid[1])
task_names = ["profile/Oct4", "profile/Sox2", "profile/Klf4", "profile/Nanog",
"counts/Oct4", "counts/Sox2", "counts/Klf4", "counts/Nanog"]
onehot_data = valid[0]
preds = model.predict(onehot_data)
to_interpret = onehot_data[[0,1,2]]
import keras.backend as K
inp = model.inputs[0]
fn_pos = {}
fn_neg = {}
for task_id, task_name in enumerate(task_names):
if "counts" in task_name:
fn_pos[task_name] = K.function([inp], K.gradients(model.outputs[task_id][:, 0], inp))
fn_neg[task_name] = K.function([inp], K.gradients(model.outputs[task_id][:, 1], inp))
else:
fn_pos[task_name] = K.function([inp], K.gradients(K.sum(K.stop_gradient(K.softmax(
model.outputs[task_id][:, :, 0])) * model.outputs[task_id][:, :, 0], axis=-1), inp))
fn_neg[task_name] = K.function([inp], K.gradients(K.sum(K.stop_gradient(K.softmax(
model.outputs[task_id][:, :, 1])) * model.outputs[task_id][:, :, 1], axis=-1), inp))
import numpy as np
from basepair.data import numpy_minibatch
grads_pos = {}
grads_neg = {}
for task_name in task_names:
grads_pos[task_name] = np.concatenate([np.array(fn_pos[task_name]([batch])).squeeze()
for batch in numpy_minibatch(valid[0], 512)])
grads_neg[task_name] = np.concatenate([np.array(fn_neg[task_name]([batch])).squeeze()
for batch in numpy_minibatch(valid[0], 512)])
# Setup different scores
hyp_scores = {}
scores = {}
for task_name in task_names:
hyp_scores[task_name] = grads_pos[task_name] + grads_neg[task_name]
#hyp_scores[task_name] = hyp_scores[task_name] - hyp_scores[task_name].mean(-1, keepdims=True)
scores[task_name] = hyp_scores[task_name] * valid[0]
def deeplift_score(path_to_model, onehot_data, task_names, gpu=None):
if gpu is not None:
os.environ['CUDA_VISIBLE_DEVICES'] = gpu
else:
# Don't use any GPU's
os.environ['CUDA_VISIBLE_DEVICES'] = ''
import deepexplain
from deepexplain.tensorflow.methods import DeepLIFTRescale
from deepexplain.tensorflow import DeepExplain
from deeplift.dinuc_shuffle import dinuc_shuffle
from collections import OrderedDict
from keras.models import load_model
import keras.backend as K
import numpy as np
attributions = OrderedDict()
K.clear_session()
keras_model = load_model(path_to_model, custom_objects={"twochannel_multinomial_nll": twochannel_multinomial_nll})
with deepexplain.tensorflow.DeepExplain(session=K.get_session()) as de:
input_tensor = keras_model.layers[0].input
for task_idx, task in enumerate(task_names):
fModel = Model(inputs=input_tensor, outputs = keras_model.layers[task_idx-len(task_names)].output)
if "counts" in task:
target_tensor = fModel(input_tensor)
else:
raw_outputs = fModel(input_tensor)
target_tensor_pos = K.stop_gradient(K.softmax(raw_outputs[:, :, 0]))*raw_outputs[:,:,0]
target_tensor_neg = K.stop_gradient(K.softmax(raw_outputs[:, :, 1]))*raw_outputs[:,:,1]
target_tensor = target_tensor_pos+target_tensor_neg
attributions[task] = de.explain('deeplift', target_tensor, input_tensor, onehot_data)
return attributions
def deepexplain_gi_score(path_to_model, onehot_data, task_names, gpu=None):
if gpu is not None:
os.environ['CUDA_VISIBLE_DEVICES'] = gpu
else:
# Don't use any GPU's
os.environ['CUDA_VISIBLE_DEVICES'] = ''
import deepexplain
from deepexplain.tensorflow.methods import DeepLIFTRescale
from deepexplain.tensorflow import DeepExplain
from deeplift.dinuc_shuffle import dinuc_shuffle
from collections import OrderedDict
from keras.models import load_model
import keras.backend as K
import numpy as np
attributions = OrderedDict()
K.clear_session()
keras_model = load_model(path_to_model, custom_objects={"twochannel_multinomial_nll": twochannel_multinomial_nll})
with deepexplain.tensorflow.DeepExplain(session=K.get_session()) as de:
input_tensor = keras_model.layers[0].input
for task_idx, task in enumerate(task_names):
fModel = Model(inputs=input_tensor, outputs = keras_model.layers[task_idx-len(task_names)].output)
if "counts" in task:
target_tensor = fModel(input_tensor)
else:
raw_outputs = fModel(input_tensor)
target_tensor_pos = K.stop_gradient(K.softmax(raw_outputs[:, :, 0]))*raw_outputs[:,:,0]
target_tensor_neg = K.stop_gradient(K.softmax(raw_outputs[:, :, 1]))*raw_outputs[:,:,1]
target_tensor = target_tensor_pos+target_tensor_neg
attributions[task] = de.explain('grad*input', target_tensor, input_tensor, onehot_data)
return attributions
dl_hyp_scores = deeplift_score("model.h5", to_interpret, task_names, gpu="2,3")
dl_hyp_scores["profile/Oct4"].shape
gi_hyp_scores = deepexplain_gi_score("model.h5", to_interpret, task_names, gpu="2,3")
gi_scores = {}
for task_name in task_names:
gi_scores[task_name] = gi_hyp_scores[task_name] * to_interpret
max_diff = np.max(np.abs(gi_scores['profile/Oct4']-scores['profile/Oct4'][:3]))
print(max_diff)
from concise.utils.plot import seqlogo_fig, seqlogo
import matplotlib.pyplot as plt
for task_idx, task in enumerate(task_names):
fig, (ax0, ax1)= plt.subplots(2, 1, sharex=True, figsize=(20, 6))
ax0.set_title(task+" de scores")
seqlogo(scores[task][0], ax=ax0)
ax1.set_title(task+" gi scores")
seqlogo(gi_scores[task][0], ax=ax1)
dl_scores = {}
for task_name in task_names:
dl_scores[task_name] = dl_hyp_scores[task_name] * to_interpret
from concise.utils.plot import seqlogo_fig, seqlogo
import matplotlib.pyplot as plt
for task_idx, task in enumerate(task_names):
fig, (ax0, ax1)= plt.subplots(2, 1, sharex=True, figsize=(20, 6))
ax0.set_title(task+" dl scores")
seqlogo(dl_scores[task][0], ax=ax0)
ax1.set_title(task+" dl hyp_scores")
seqlogo(dl_hyp_scores[task][0], ax=ax1)
def ism_score(path_to_model, onehot_data, task_names):
from collections import OrderedDict
from basepair.math import softmax
attribs = OrderedDict()
K.clear_session()
keras_model = load_model(path_to_model, custom_objects={"twochannel_multinomial_nll": twochannel_multinomial_nll})
# create mutations
mutated_seqs = []
for sample in onehot_data:
for pos in range(len(sample)):
for base in range(4):
mutated = sample.copy()
mutated[pos] = np.zeros((1,4))
mutated[pos][base] = 1
mutated_seqs.append(mutated)
# get predictions
raw_predictions = keras_model.predict(np.array(mutated_seqs), batch_size=32)
# get scores
for task_idx, task in enumerate(task_names):
attribs[task] = []
for sample_idx, sample in enumerate(onehot_data):
temp_attribs = []
for pos in range(len(sample)):
temp_attribs.append([])
for base in range(4):
relevant_output = raw_predictions[task_idx][(sample_idx*len(sample)*4)+(pos*4)+base]
if "counts" in task:
temp_attribs[pos].append(np.sum(relevant_output))
else:
temp_attribs[pos].append(np.sum(softmax(relevant_output)*relevant_output)) #this is sum across strands and positions
temp_attribs = np.array(temp_attribs)
avg_scores = np.mean(temp_attribs, axis=1, keepdims=True) #this is ACGT axis
temp_attribs -= avg_scores
attribs[task].append(temp_attribs)
return attribs
to_interpret.shape
hyp_ism_scores = ism_score("model.h5", to_interpret, task_names)
ism_scores = {}
for task_name in task_names:
print(np.array(hyp_ism_scores[task_name]).shape)
ism_scores[task_name] = hyp_ism_scores[task_name] * to_interpret
for task_idx, task in enumerate(task_names):
fig, (ax0, ax1)= plt.subplots(2, 1, sharex=True, figsize=(20, 6))
ax0.set_title(task+" ism scores")
seqlogo(ism_scores[task][0], ax=ax0)
ax1.set_title(task+" ism hyp_scores")
seqlogo(hyp_ism_scores[task][0], ax=ax1)