| |
|
|
| import argparse |
| import os |
| import sys |
| import time |
| import traceback |
| import sklearn |
| import json |
| import tensorflow as tf |
| import keras |
| import keras_nlp |
| import keras.layers as kl |
| from keras.layers import Conv1D, MaxPooling1D, AveragePooling1D |
| from keras_nlp.layers import SinePositionEncoding, TransformerEncoder |
| from keras.layers import BatchNormalization |
| from keras.models import Sequential, Model, load_model |
| from keras.optimizers import Adam |
| from keras.callbacks import EarlyStopping, History, ModelCheckpoint |
| import pandas as pd |
| import numpy as np |
| import matplotlib.pyplot as plt |
| import seaborn as sns |
| from scipy import stats |
| from collections import Counter |
| from itertools import product |
| from sklearn.metrics import mean_squared_error |
|
|
| startTime=time.time() |
| import os |
| os.environ["CUDA_VISIBLE_DEVICES"] = "0" |
|
|
| def parse_arguments(): |
| parser = argparse.ArgumentParser(description='DeepSTARR') |
| parser.add_argument('--config', type=str, default='config/config-conv-117.json', help='Configuration file path (default: config/config-conv-117.json)') |
| parser.add_argument('--indir', type=str, default='./DeepSTARR-Reimplementation-main/data/Sequences_activity_all.txt', help='Input data directory (default: ./DeepSTARR-Reimplementation-main/data/Sequences_activity_all.txt)') |
| parser.add_argument('--out_dir', type=str, default='output', help='Output directory (default: output)') |
| parser.add_argument('--label', type=str, default='baseline', help='Output label (default: baseline)') |
| return parser.parse_args() |
|
|
| def LoadConfig(config): |
| with open(config, 'r') as file: |
| params = json.load(file) |
| return params |
|
|
| def one_hot_encode(seq): |
| nucleotide_dict = {'A': [1, 0, 0, 0], |
| 'C': [0, 1, 0, 0], |
| 'G': [0, 0, 1, 0], |
| 'T': [0, 0, 0, 1], |
| 'N': [0, 0, 0, 0]} |
| return np.array([nucleotide_dict[nuc] for nuc in seq]) |
|
|
| def kmer_encode(sequence, k=3): |
| sequence = sequence.upper() |
| kmers = [sequence[i:i+k] for i in range(len(sequence) - k + 1)] |
| kmer_counts = Counter(kmers) |
| return {kmer: kmer_counts.get(kmer, 0) / len(kmers) for kmer in [''.join(p) for p in product('ACGT', repeat=k)]} |
|
|
| def kmer_features(seq, k=3): |
| all_kmers = [''.join(p) for p in product('ACGT', repeat=k)] |
| feature_matrix = [] |
| kmer_freqs = kmer_encode(seq, k) |
| feature_vector = [kmer_freqs[kmer] for kmer in all_kmers] |
| feature_matrix.append(feature_vector) |
| return np.array(feature_matrix) |
|
|
| def prepare_input(data_set, params): |
| if params['encode'] == 'one-hot': |
| seq_matrix = np.array(data_set['Sequence'].apply(one_hot_encode).tolist()) |
| elif params['encode'] == 'k-mer': |
| seq_matrix = np.array(data_set['Sequence'].apply(kmer_features, k=3).tolist()) |
| else: |
| raise Exception ('wrong encoding method') |
|
|
| Y_dev = data_set.Dev_log2_enrichment |
| Y_hk = data_set.Hk_log2_enrichment |
| Y = [Y_dev, Y_hk] |
|
|
| return seq_matrix, Y |
|
|
| def DeepSTARR(params): |
| if params['encode'] == 'one-hot': |
| input = kl.Input(shape=(249, 4)) |
| elif params['encode'] == 'k-mer': |
| input = kl.Input(shape=(1, 64)) |
|
|
| for i in range(params['convolution_layers']['n_layers']): |
| x = kl.Conv1D(params['convolution_layers']['filters'][i], |
| kernel_size = params['convolution_layers']['kernel_sizes'][i], |
| padding = params['pad'], |
| name=str('Conv1D_'+str(i+1)))(input) |
| x = kl.BatchNormalization()(x) |
| x = kl.Activation('relu')(x) |
| if params['encode'] == 'one-hot': |
| x = kl.MaxPooling1D(2)(x) |
|
|
| if params['dropout_conv'] == 'yes': x = kl.Dropout(params['dropout_prob'])(x) |
|
|
| |
| for i in range(params['transformer_layers']['n_layers']): |
| if i == 0: |
| x = x + keras_nlp.layers.SinePositionEncoding()(x) |
| x = TransformerEncoder(intermediate_dim = params['transformer_layers']['attn_key_dim'][i], |
| num_heads = params['transformer_layers']['attn_heads'][i], |
| dropout = params['dropout_prob'])(x) |
| |
| |
| |
| x = kl.Flatten()(x) |
| |
| |
| |
| for i in range(params['n_dense_layer']): |
| x = kl.Dense(params['dense_neurons'+str(i+1)], |
| name=str('Dense_'+str(i+1)))(x) |
| x = kl.BatchNormalization()(x) |
| x = kl.Activation('relu')(x) |
| x = kl.Dropout(params['dropout_prob'])(x) |
| |
| |
| bottleneck = x |
| |
| |
| |
| tasks = ['Dev', 'Hk'] |
| outputs = [] |
| for task in tasks: |
| outputs.append(kl.Dense(1, activation='linear', name=str('Dense_' + task))(bottleneck)) |
| |
| |
| model = Model([input], outputs) |
| model.compile(Adam(learning_rate=params['lr']), |
| loss=['mse', 'mse'], |
| loss_weights=[1, 1]) |
|
|
| return model, params |
|
|
| def train(selected_model, X_train, Y_train, X_valid, Y_valid, params): |
| my_history=selected_model.fit(X_train, Y_train, |
| validation_data=(X_valid, Y_valid), |
| batch_size=params['batch_size'], |
| epochs=params['epochs'], |
| callbacks=[EarlyStopping(patience=params['early_stop'], monitor="val_loss", restore_best_weights=True), History()]) |
|
|
| return selected_model, my_history |
|
|
| def summary_statistics(X, Y, set, task, main_model, main_params, out_dir): |
| pred = main_model.predict(X, batch_size=main_params['batch_size']) |
| if task =="Dev": |
| i=0 |
| if task =="Hk": |
| i=1 |
| print(set + ' MSE ' + task + ' = ' + str("{0:0.2f}".format(mean_squared_error(Y, pred[i].squeeze())))) |
| print(set + ' PCC ' + task + ' = ' + str("{0:0.2f}".format(stats.pearsonr(Y, pred[i].squeeze())[0]))) |
| print(set + ' SCC ' + task + ' = ' + str("{0:0.2f}".format(stats.spearmanr(Y, pred[i].squeeze())[0]))) |
| return str("{0:0.2f}".format(stats.pearsonr(Y, pred[i].squeeze())[0])) |
| |
| def main(config, indir, out_dir, label): |
| data = pd.read_table(indir) |
| params = LoadConfig(config) |
|
|
| X_train, Y_train = prepare_input(data[data['set'] == "Train"], params) |
| X_valid, Y_valid = prepare_input(data[data['set'] == "Val"], params) |
| X_test, Y_test = prepare_input(data[data['set'] == "Test"], params) |
|
|
| DeepSTARR(params)[0].summary() |
| DeepSTARR(params)[1] |
| main_model, main_params = DeepSTARR(params) |
| main_model, my_history = train(main_model, X_train, Y_train, X_valid, Y_valid, main_params) |
|
|
| endTime=time.time() |
| seconds=endTime-startTime |
| print("Total training time:",round(seconds/60,2),"minutes") |
|
|
| dev_results = summary_statistics(X_test, Y_test[0], "test", "Dev", main_model, main_params, out_dir) |
| hk_results = summary_statistics(X_test, Y_test[1], "test", "Hk", main_model, main_params, out_dir) |
|
|
| result = { |
| "AutoDNA": { |
| "means": { |
| "PCC(Dev)": dev_results, |
| "PCC(Hk)": hk_results |
| } |
| } |
| } |
| |
| with open(f"{out_dir}/final_info.json", "w") as file: |
| json.dump(result, file, indent=4) |
|
|
| main_model.save(out_dir + '/' + label + '.h5') |
|
|
| if __name__ == "__main__": |
| try: |
| args = parse_arguments() |
| main(args.config, args.indir, args.out_dir, args.label) |
| except Exception as e: |
| print("Original error in subprocess:", flush=True) |
| if not os.path.exists(args.out_dir): |
| os.makedirs(args.out_dir) |
| traceback.print_exc(file=open(os.path.join(args.out_dir, "traceback.log"), "w")) |
| raise |
|
|
|
|
|
|
|
|
|
|