| | import tensorflow as tf |
| | from tensorflow.keras import layers, Model |
| | import numpy as np |
| |
|
| | class DeCorrelationLoss(tf.keras.layers.Layer): |
| | """๋
ผ๋ฌธ์ ์ ํํ DeCov ์ ๊ทํ ๊ตฌํ""" |
| | |
| | def __init__(self, lambda_decov=1e-4, **kwargs): |
| | super(DeCorrelationLoss, self).__init__(**kwargs) |
| | self.lambda_decov = lambda_decov |
| | |
| | def build(self, input_shape): |
| | super(DeCorrelationLoss, self).build(input_shape) |
| | |
| | def call(self, inputs): |
| | batch_size = tf.cast(tf.shape(inputs)[0], tf.float32) |
| | |
| | |
| | inputs_centered = inputs - tf.reduce_mean(inputs, axis=0, keepdims=True) |
| | |
| | |
| | covariance = tf.matmul(inputs_centered, inputs_centered, transpose_a=True) / (batch_size - 1) |
| | |
| | |
| | covariance_off_diagonal = covariance - tf.linalg.diag(tf.linalg.diag_part(covariance)) |
| | |
| | |
| | decov_loss = 0.5 * tf.reduce_sum(tf.square(covariance_off_diagonal)) |
| | |
| | self.add_loss(self.lambda_decov * decov_loss) |
| | return inputs |
| |
|
| | class MalConv(Model): |
| | """๋
ผ๋ฌธ ์ ํ ์ฌ์ MalConv ๋ชจ๋ธ""" |
| | |
| | def __init__(self, |
| | max_input_length=2_000_000, |
| | embedding_size=8, |
| | filter_size=500, |
| | stride=500, |
| | num_filters=128, |
| | fc_size=128, |
| | use_decov=True, |
| | lambda_decov=1e-4, |
| | **kwargs): |
| | super(MalConv, self).__init__(**kwargs) |
| | |
| | self.max_input_length = max_input_length |
| | self.use_decov = use_decov |
| | |
| | |
| | self.embedding = layers.Embedding( |
| | input_dim=256, |
| | output_dim=embedding_size, |
| | input_length=None, |
| | mask_zero=False, |
| | name='byte_embedding' |
| | ) |
| | |
| | |
| | self.conv_A = layers.Conv1D( |
| | filters=num_filters, |
| | kernel_size=filter_size, |
| | strides=stride, |
| | padding='valid', |
| | activation='relu', |
| | name='conv_A' |
| | ) |
| | |
| | self.conv_B = layers.Conv1D( |
| | filters=num_filters, |
| | kernel_size=filter_size, |
| | strides=stride, |
| | padding='valid', |
| | activation='sigmoid', |
| | name='conv_B' |
| | ) |
| | |
| | |
| | self.global_max_pool = layers.GlobalMaxPooling1D(name='global_max_pool') |
| | |
| | |
| | self.fc = layers.Dense(fc_size, activation='relu', name='fc_layer') |
| | |
| | |
| | if use_decov: |
| | self.decov_layer = DeCorrelationLoss(lambda_decov=lambda_decov) |
| | |
| | self.dropout = layers.Dropout(0.5, name='dropout') |
| | self.output_layer = layers.Dense(1, activation='sigmoid', name='output') |
| | |
| | def call(self, inputs, training=None): |
| | |
| | x = self.embedding(inputs) |
| | |
| | |
| | conv_a = self.conv_A(x) |
| | conv_b = self.conv_B(x) |
| | gated_conv = layers.multiply([conv_a, conv_b], name='gated_conv') |
| | |
| | |
| | pooled = self.global_max_pool(gated_conv) |
| | |
| | |
| | fc_out = self.fc(pooled) |
| | |
| | |
| | if self.use_decov: |
| | fc_out = self.decov_layer(fc_out) |
| | |
| | |
| | if training: |
| | fc_out = self.dropout(fc_out, training=training) |
| | |
| | |
| | output = self.output_layer(fc_out) |
| | |
| | return output |
| |
|
| | def create_malconv_model (max_input_length=2_000_000): |
| | """๋
ผ๋ฌธ ์์ ๋์ผ ์ฌ์ ๋ชจ๋ธ""" |
| | model = MalConv(max_input_length=max_input_length) |
| | |
| | |
| | initial_lr = 0.01 |
| | lr_schedule = tf.keras.optimizers.schedules.ExponentialDecay( |
| | initial_learning_rate=initial_lr, |
| | decay_steps=1000, |
| | decay_rate=0.96, |
| | staircase=True |
| | ) |
| | |
| | optimizer = tf.keras.optimizers.SGD( |
| | learning_rate=lr_schedule, |
| | momentum=0.9, |
| | nesterov=True |
| | ) |
| | |
| | model.compile( |
| | optimizer=optimizer, |
| | loss='binary_crossentropy', |
| | metrics=['accuracy', |
| | tf.keras.metrics.Precision(name='precision'), |
| | tf.keras.metrics.Recall(name='recall'), |
| | tf.keras.metrics.AUC(name='auc')] |
| | ) |
| | |
| | return model |
| |
|