Spaces:
Build error
Build error
| # import numpy as np | |
| # from sklearn.decomposition import PCA | |
| # from sklearn.manifold import TSNE | |
| # from src.classifiers_classic_ml import visualize_embeddings, train_and_evaluate_model | |
| import os | |
| import pandas as pd | |
| import pytest | |
| from sklearn.datasets import make_classification | |
| from sklearn.metrics import accuracy_score, f1_score | |
| from sklearn.model_selection import train_test_split | |
| from sklearn.preprocessing import LabelEncoder | |
| from tensorflow.keras.layers import BatchNormalization, Concatenate, Dense, Dropout | |
| from tensorflow.keras.losses import CategoricalCrossentropy | |
| from tensorflow.keras.models import Model | |
| from tensorflow.keras.optimizers import SGD, Adam | |
| from src.classifiers_mlp import MultimodalDataset, create_early_fusion_model, train_mlp | |
| #################################################################################################### | |
| ##################################### Test the Keras MLP Models #################################### | |
| #################################################################################################### | |
| def correlated_sample_data(): | |
| """ | |
| Fixture to create a correlated synthetic dataset using make_classification for testing. | |
| It generates data with 10 text features and 10 image features. | |
| Returns: | |
| train_df (pd.DataFrame): DataFrame with train data. | |
| test_df (pd.DataFrame): DataFrame with test data. | |
| """ | |
| # Create synthetic multi-class data with 8 features (4 text-like, 4 image-like) | |
| X, y = make_classification( | |
| n_samples=20, n_features=8, n_informative=6, n_classes=3, random_state=42 | |
| ) | |
| # Rename features to simulate text and image columns | |
| feature_names = [f"text_{i}" for i in range(4)] + [ | |
| f"image_{i}" for i in range(4, 8) | |
| ] | |
| # Create a DataFrame and assign class labels | |
| df = pd.DataFrame(X, columns=feature_names) | |
| df["class_id"] = y | |
| # Split into train and test sets | |
| train_df, test_df = train_test_split(df, test_size=0.3, random_state=42) | |
| return train_df, test_df | |
| def label_encoder(correlated_sample_data): | |
| """ | |
| Fixture to create a label encoder based on the training data. | |
| """ | |
| train_df, _ = correlated_sample_data | |
| label_encoder = LabelEncoder() | |
| label_encoder.fit(train_df["class_id"]) | |
| return label_encoder | |
| def test_multimodal_dataset_image_only(correlated_sample_data, label_encoder): | |
| """ | |
| Test the MultimodalDataset class with only image data. | |
| """ | |
| train_df, test_df = correlated_sample_data | |
| # Image columns (the second 4 features) | |
| image_columns = [f"image_{i}" for i in range(4, 8)] | |
| label_column = "class_id" | |
| # Create the dataset | |
| train_dataset = MultimodalDataset( | |
| train_df, | |
| text_cols=None, | |
| image_cols=image_columns, | |
| label_col=label_column, | |
| encoder=label_encoder, | |
| ) | |
| # Check if the dataset is correctly instantiated | |
| assert train_dataset.image_data is not None, "Image data should be instantiated" | |
| assert train_dataset.text_data is None, "Text data should be None" | |
| # Fetch a batch of data | |
| (batch_inputs, batch_labels) = train_dataset[0] | |
| assert "image" in batch_inputs, "Batch should contain image data" | |
| assert "text" not in batch_inputs, "Batch should not contain text data" | |
| assert batch_inputs["image"].shape[1] == len(image_columns), ( | |
| "Image data shape is incorrect" | |
| ) | |
| assert batch_labels is not None, "Batch should contain labels" | |
| assert batch_labels.shape[0] == batch_inputs["image"].shape[0], ( | |
| "Labels should match the batch size" | |
| ) | |
| def test_multimodal_dataset_text_only(correlated_sample_data, label_encoder): | |
| """ | |
| Test the MultimodalDataset class with only text data. | |
| """ | |
| train_df, test_df = correlated_sample_data | |
| # Text columns (the first 4 features) | |
| text_columns = [f"text_{i}" for i in range(4)] | |
| label_column = "class_id" | |
| # Create the dataset | |
| train_dataset = MultimodalDataset( | |
| train_df, | |
| text_cols=text_columns, | |
| image_cols=None, | |
| label_col=label_column, | |
| encoder=label_encoder, | |
| ) | |
| # Check if the dataset is correctly instantiated | |
| assert train_dataset.text_data is not None, "Text data should be instantiated" | |
| assert train_dataset.image_data is None, "Image data should be None" | |
| # Fetch a batch of data | |
| (batch_inputs, batch_labels) = train_dataset[0] | |
| assert "text" in batch_inputs, "Batch should contain text data" | |
| assert "image" not in batch_inputs, "Batch should not contain image data" | |
| assert batch_inputs["text"].shape[1] == len(text_columns), ( | |
| "Text data shape is incorrect" | |
| ) | |
| assert batch_labels is not None, "Batch should contain labels" | |
| assert batch_labels.shape[0] == batch_inputs["text"].shape[0], ( | |
| "Labels should match the batch size" | |
| ) | |
| def test_multimodal_dataset_multimodal(correlated_sample_data, label_encoder): | |
| """ | |
| Test the MultimodalDataset class with both text and image data. | |
| """ | |
| train_df, test_df = correlated_sample_data | |
| # Text and image columns | |
| text_columns = [f"text_{i}" for i in range(4)] | |
| image_columns = [f"image_{i}" for i in range(4, 8)] | |
| label_column = "class_id" | |
| # Create the dataset | |
| train_dataset = MultimodalDataset( | |
| train_df, | |
| text_cols=text_columns, | |
| image_cols=image_columns, | |
| label_col=label_column, | |
| encoder=label_encoder, | |
| ) | |
| # Check if the dataset is correctly instantiated | |
| assert train_dataset.text_data is not None, "Text data should be instantiated" | |
| assert train_dataset.image_data is not None, "Image data should be instantiated" | |
| # Fetch a batch of data | |
| (batch_inputs, batch_labels) = train_dataset[0] | |
| assert "text" in batch_inputs, "Batch should contain text data" | |
| assert "image" in batch_inputs, "Batch should contain image data" | |
| assert batch_inputs["text"].shape[1] == len(text_columns), ( | |
| "Text data shape is incorrect" | |
| ) | |
| assert batch_inputs["image"].shape[1] == len(image_columns), ( | |
| "Image data shape is incorrect" | |
| ) | |
| assert batch_labels is not None, "Batch should contain labels" | |
| assert ( | |
| batch_labels.shape[0] | |
| == batch_inputs["text"].shape[0] | |
| == batch_inputs["image"].shape[0] | |
| ), "Labels should match the batch size" | |
| def test_create_early_fusion_model_single_modality_image(): | |
| """ | |
| Test the model creation with only image input or only text input. | |
| Ensure the architecture matches expectations. | |
| """ | |
| text_input_size = None | |
| image_input_size = 4 | |
| output_size = 3 | |
| # Create the model | |
| model = create_early_fusion_model( | |
| text_input_size, image_input_size, output_size, hidden=[128, 64], p=0.3 | |
| ) | |
| # Check if the model has the expected number of layers | |
| assert isinstance(model, Model), "Model should be a Keras Model instance" | |
| # Check that the input and output shapes are consistent | |
| assert model.input_shape == (None, image_input_size), ( | |
| "Input shape should match image input size" | |
| ) | |
| assert model.output_shape == (None, output_size), ( | |
| "Output shape should match number of classes" | |
| ) | |
| # Check that there are the correct number of Dense, Dropout, and BatchNormalization layers | |
| dense_layers = [layer for layer in model.layers if isinstance(layer, Dense)] | |
| dropout_layers = [layer for layer in model.layers if isinstance(layer, Dropout)] | |
| batchnorm_layers = [ | |
| layer for layer in model.layers if isinstance(layer, BatchNormalization) | |
| ] | |
| assert len(dense_layers) == 3, ( | |
| "There should be 3 Dense layers (2 hidden + 1 output)" | |
| ) | |
| assert len(dropout_layers) > 0, "There should be at least 1 Dropout layers" | |
| assert len(batchnorm_layers) > 0, ( | |
| "There should be at least 1 BatchNormalization layer" | |
| ) | |
| def test_create_early_fusion_model_single_modality_text(): | |
| """ | |
| Test the model creation with only image input or only text input. | |
| Ensure the architecture matches expectations. | |
| """ | |
| text_input_size = 4 | |
| image_input_size = None | |
| output_size = 3 | |
| # Create the model | |
| model = create_early_fusion_model( | |
| text_input_size, image_input_size, output_size, hidden=[128, 64], p=0.3 | |
| ) | |
| # Check if the model has the expected number of layers | |
| assert isinstance(model, Model), "Model should be a Keras Model instance" | |
| # Check that the input and output shapes are consistent | |
| assert model.input_shape == (None, text_input_size), ( | |
| "Input shape should match text input size" | |
| ) | |
| assert model.output_shape == (None, output_size), ( | |
| "Output shape should match number of classes" | |
| ) | |
| # Check that there are the correct number of Dense, Dropout, and BatchNormalization layers | |
| dense_layers = [layer for layer in model.layers if isinstance(layer, Dense)] | |
| dropout_layers = [layer for layer in model.layers if isinstance(layer, Dropout)] | |
| batchnorm_layers = [ | |
| layer for layer in model.layers if isinstance(layer, BatchNormalization) | |
| ] | |
| assert len(dense_layers) == 3, ( | |
| "There should be 3 Dense layers (2 hidden + 1 output)" | |
| ) | |
| assert len(dropout_layers) > 0, "There should be at least 1 Dropout layers" | |
| assert len(batchnorm_layers) > 0, ( | |
| "There should be at least 1 BatchNormalization layer" | |
| ) | |
| def test_create_early_fusion_model_multimodal(): | |
| """ | |
| Test the model creation with both text and image input. | |
| Ensure the architecture matches expectations. | |
| """ | |
| text_input_size = 4 | |
| image_input_size = 4 | |
| output_size = 3 | |
| # Create the model | |
| model = create_early_fusion_model( | |
| text_input_size, image_input_size, output_size, hidden=[128, 64], p=0.3 | |
| ) | |
| # Check if the model has the expected number of layers | |
| assert isinstance(model, Model), "Model should be a Keras Model instance" | |
| # Check that the input and output shapes are consistent | |
| assert model.input_shape == [(None, text_input_size), (None, image_input_size)], ( | |
| "Input shape should match both text and image input sizes" | |
| ) | |
| assert model.output_shape == (None, output_size), ( | |
| "Output shape should match number of classes" | |
| ) | |
| # Check that the concatenation of text and image inputs is present | |
| assert any(isinstance(layer, Concatenate) for layer in model.layers), ( | |
| "There should be a Concatenate layer for text and image inputs" | |
| ) | |
| # Check that there are the correct number of Dense, Dropout, and BatchNormalization layers | |
| dense_layers = [layer for layer in model.layers if isinstance(layer, Dense)] | |
| dropout_layers = [layer for layer in model.layers if isinstance(layer, Dropout)] | |
| batchnorm_layers = [ | |
| layer for layer in model.layers if isinstance(layer, BatchNormalization) | |
| ] | |
| assert len(dense_layers) == 3, ( | |
| "There should be 3 Dense layers (2 hidden + 1 output)" | |
| ) | |
| assert len(dropout_layers) > 0, "There should be at least 1 Dropout layers" | |
| assert len(batchnorm_layers) > 0, ( | |
| "There should be at least 1 BatchNormalization layer" | |
| ) | |
| def test_train_mlp_single_modality_image(correlated_sample_data, label_encoder): | |
| """ | |
| Test the MLP training with only image data. | |
| Ensure the model trains and evaluates correctly. | |
| """ | |
| train_df, test_df = correlated_sample_data | |
| # Image columns (the second 10 features) | |
| image_columns = [f"image_{i}" for i in range(4, 8)] | |
| label_column = "class_id" | |
| # Create datasets | |
| train_dataset = MultimodalDataset( | |
| train_df, | |
| text_cols=None, | |
| image_cols=image_columns, | |
| label_col=label_column, | |
| encoder=label_encoder, | |
| ) | |
| test_dataset = MultimodalDataset( | |
| test_df, | |
| text_cols=None, | |
| image_cols=image_columns, | |
| label_col=label_column, | |
| encoder=label_encoder, | |
| ) | |
| image_input_size = len(image_columns) | |
| output_size = len(label_encoder.classes_) | |
| # Train the model | |
| model, test_accuracy, f1, macro_auc = train_mlp( | |
| train_loader=train_dataset, | |
| test_loader=test_dataset, | |
| text_input_size=None, | |
| image_input_size=image_input_size, | |
| output_size=output_size, | |
| num_epochs=1, | |
| set_weights=True, | |
| adam=True, | |
| patience=10, | |
| save_results=False, | |
| train_model=False, | |
| test_mlp_model=False, | |
| ) | |
| # Check model | |
| assert model is not None, "Model should not be None after training." | |
| # Ensure the model is compiled with the correct loss and optimizer | |
| assert ( | |
| isinstance(model.loss, CategoricalCrossentropy) | |
| or model.loss == "categorical_crossentropy" | |
| ), f"Loss function should be categorical crossentropy, but got {model.loss}" | |
| # Check model input and output shapes | |
| assert model.input_shape == (None, image_input_size), ( | |
| "Input shape should match image input size" | |
| ) | |
| assert model.output_shape == (None, output_size), ( | |
| "Output shape should match number of classes" | |
| ) | |
| # Check if the model is compiled with the correct optimizer | |
| assert isinstance(model.optimizer, Adam) or isinstance(model.optimizer, SGD), ( | |
| f"Optimizer should be Adam or SGD, but got {model.optimizer}" | |
| ) | |
| def test_train_mlp_single_modality_text(correlated_sample_data, label_encoder): | |
| """ | |
| Test the MLP training with only text data. | |
| Ensure the model trains and evaluates correctly. | |
| """ | |
| train_df, test_df = correlated_sample_data | |
| # Text columns (the first 10 features) | |
| text_columns = [f"text_{i}" for i in range(4)] | |
| label_column = "class_id" | |
| # Create datasets | |
| train_dataset = MultimodalDataset( | |
| train_df, | |
| text_cols=text_columns, | |
| image_cols=None, | |
| label_col=label_column, | |
| encoder=label_encoder, | |
| ) | |
| test_dataset = MultimodalDataset( | |
| test_df, | |
| text_cols=text_columns, | |
| image_cols=None, | |
| label_col=label_column, | |
| encoder=label_encoder, | |
| ) | |
| text_input_size = len(text_columns) | |
| output_size = len(label_encoder.classes_) | |
| # Train the model | |
| model, test_accuracy, f1, macro_auc = train_mlp( | |
| train_loader=train_dataset, | |
| test_loader=test_dataset, | |
| text_input_size=text_input_size, | |
| image_input_size=None, | |
| output_size=output_size, | |
| num_epochs=1, | |
| set_weights=True, | |
| adam=True, | |
| patience=10, | |
| save_results=False, | |
| train_model=False, | |
| test_mlp_model=False, | |
| ) | |
| # Check model | |
| assert model is not None, "Model should not be None after training." | |
| # Ensure the model is compiled with the correct loss and optimizer | |
| assert ( | |
| isinstance(model.loss, CategoricalCrossentropy) | |
| or model.loss == "categorical_crossentropy" | |
| ), f"Loss function should be categorical crossentropy, but got {model.loss}" | |
| # Check model input and output shapes | |
| assert model.input_shape == (None, text_input_size), ( | |
| "Input shape should match text input size" | |
| ) | |
| assert model.output_shape == (None, output_size), ( | |
| "Output shape should match number of classes" | |
| ) | |
| # Check if the model is compiled with the correct optimizer | |
| assert isinstance(model.optimizer, Adam) or isinstance(model.optimizer, SGD), ( | |
| f"Optimizer should be Adam or SGD, but got {model.optimizer}" | |
| ) | |
| def test_train_mlp_multimodal(correlated_sample_data, label_encoder): | |
| """ | |
| Test the MLP training with class weights for an imbalanced dataset. | |
| Ensure class weights are applied correctly and early stopping works. | |
| """ | |
| train_df, test_df = correlated_sample_data | |
| # Text and image columns | |
| text_columns = [f"text_{i}" for i in range(4)] | |
| image_columns = [f"image_{i}" for i in range(4, 8)] | |
| label_column = "class_id" | |
| # Create datasets | |
| train_dataset = MultimodalDataset( | |
| train_df, | |
| text_cols=text_columns, | |
| image_cols=image_columns, | |
| label_col=label_column, | |
| encoder=label_encoder, | |
| ) | |
| test_dataset = MultimodalDataset( | |
| test_df, | |
| text_cols=text_columns, | |
| image_cols=image_columns, | |
| label_col=label_column, | |
| encoder=label_encoder, | |
| ) | |
| text_input_size = len(text_columns) | |
| image_input_size = len(image_columns) | |
| output_size = len(label_encoder.classes_) | |
| # Train the model | |
| model, test_accuracy, f1, macro_auc = train_mlp( | |
| train_loader=train_dataset, | |
| test_loader=test_dataset, | |
| text_input_size=text_input_size, | |
| image_input_size=image_input_size, | |
| output_size=output_size, | |
| num_epochs=1, | |
| set_weights=True, | |
| adam=True, | |
| patience=10, | |
| save_results=False, | |
| train_model=False, | |
| test_mlp_model=False, | |
| ) | |
| # Check model | |
| assert model is not None, "Model should not be None after training." | |
| # Ensure the model is compiled with the correct loss and optimizer | |
| assert ( | |
| isinstance(model.loss, CategoricalCrossentropy) | |
| or model.loss == "categorical_crossentropy" | |
| ), f"Loss function should be categorical crossentropy, but got {model.loss}" | |
| # Check model input and output shapes | |
| assert model.input_shape == [(None, text_input_size), (None, image_input_size)], ( | |
| "Input shape should match both text and image input sizes" | |
| ) | |
| assert model.output_shape == (None, output_size), ( | |
| "Output shape should match number of classes" | |
| ) | |
| # Check if the model is compiled with the correct optimizer | |
| assert isinstance(model.optimizer, Adam) or isinstance(model.optimizer, SGD), ( | |
| f"Optimizer should be Adam or SGD, but got {model.optimizer}" | |
| ) | |
| # Check if the result files are correctly saved | |
| def test_result_files(): | |
| """ | |
| Test if the result files are created for each modality and have the correct format. | |
| """ | |
| # Get the absolute path of the directory where this test file is located | |
| test_dir = os.path.dirname(os.path.abspath(__file__)) | |
| # Paths for result files relative to the test file location | |
| multimodal_results_path = os.path.join( | |
| test_dir, "../results/multimodal_results.csv" | |
| ) | |
| text_results_path = os.path.join(test_dir, "../results/text_results.csv") | |
| image_results_path = os.path.join(test_dir, "../results/image_results.csv") | |
| # Check if the files exist | |
| assert os.path.exists(multimodal_results_path), "Multimodal result file is missing!" | |
| assert os.path.exists(text_results_path), "Text result file is missing!" | |
| assert os.path.exists(image_results_path), "Image result file is missing!" | |
| # Check if the files are not empty and in correct format (CSV) | |
| for file_path in [multimodal_results_path, text_results_path, image_results_path]: | |
| df = pd.read_csv(file_path) | |
| assert not df.empty, f"{file_path} is empty!" | |
| assert "Predictions" in df.columns and "True Labels" in df.columns, ( | |
| f"{file_path} is not in the correct format!" | |
| ) | |
| # Check if the accuracy and F1 scores meet the specified thresholds | |
| def test_model_performance(): | |
| """ | |
| Test if the accuracy and F1 score are above the required thresholds. | |
| """ | |
| # Get the absolute path of the directory where this test file is located | |
| test_dir = os.path.dirname(os.path.abspath(__file__)) | |
| # Paths for result files relative to the test file location | |
| multimodal_results_path = os.path.join( | |
| test_dir, "../results/multimodal_results.csv" | |
| ) | |
| text_results_path = os.path.join(test_dir, "../results/text_results.csv") | |
| image_results_path = os.path.join(test_dir, "../results/image_results.csv") | |
| # Load the result files | |
| multimodal_results = pd.read_csv(multimodal_results_path) | |
| text_results = pd.read_csv(text_results_path) | |
| image_results = pd.read_csv(image_results_path) | |
| # Define the accuracy and F1-score thresholds | |
| multimodal_accuracy_threshold = 0.85 | |
| multimodal_f1_threshold = 0.80 | |
| text_accuracy_threshold = 0.85 | |
| text_f1_threshold = 0.80 | |
| image_accuracy_threshold = 0.75 | |
| image_f1_threshold = 0.70 | |
| # Calculate accuracy and F1 score for multimodal results | |
| multimodal_accuracy = accuracy_score( | |
| multimodal_results["True Labels"], multimodal_results["Predictions"] | |
| ) | |
| multimodal_f1 = f1_score( | |
| multimodal_results["True Labels"], | |
| multimodal_results["Predictions"], | |
| average="macro", | |
| ) | |
| # Calculate accuracy and F1 score for text results | |
| text_accuracy = accuracy_score( | |
| text_results["True Labels"], text_results["Predictions"] | |
| ) | |
| text_f1 = f1_score( | |
| text_results["True Labels"], text_results["Predictions"], average="macro" | |
| ) | |
| # Calculate accuracy and F1 score for image results | |
| image_accuracy = accuracy_score( | |
| image_results["True Labels"], image_results["Predictions"] | |
| ) | |
| image_f1 = f1_score( | |
| image_results["True Labels"], image_results["Predictions"], average="macro" | |
| ) | |
| # Check multimodal performance | |
| assert multimodal_accuracy > multimodal_accuracy_threshold, ( | |
| f"Multimodal accuracy is below {multimodal_accuracy_threshold}" | |
| ) | |
| assert multimodal_f1 > multimodal_f1_threshold, ( | |
| f"Multimodal F1 score is below {multimodal_f1_threshold}" | |
| ) | |
| # Check text performance | |
| assert text_accuracy > text_accuracy_threshold, ( | |
| f"Text accuracy is below {text_accuracy_threshold}" | |
| ) | |
| assert text_f1 > text_f1_threshold, f"Text F1 score is below {text_f1_threshold}" | |
| # Check image performance | |
| assert image_accuracy > image_accuracy_threshold, ( | |
| f"Image accuracy is below {image_accuracy_threshold}" | |
| ) | |
| assert image_f1 > image_f1_threshold, ( | |
| f"Image F1 score is below {image_f1_threshold}" | |
| ) | |
| if __name__ == "__main__": | |
| pytest.main() | |