import pandas as pd import hypernetx as hnx import...
Tạo vào: 31 tháng 7, 2025
Tạo vào: 31 tháng 7, 2025
import pandas as pd
import hypernetx as hnx
import random
from itertools import combinations
import numpy as np
import matplotlib.pyplot as plt
from collections import defaultdict
import torch
import torch.nn as nn
import torch.nn.functional as F
from torch.utils.data import Dataset, DataLoader
from sklearn.metrics import roc_auc_score, precision_recall_curve, auc, precision_score, recall_score, f1_score
from sklearn.model_selection import train_test_split
import os
random.seed(42)
np.random.seed(42)
torch.manual_seed(42)
if torch.cuda.is_available():
torch.cuda.manual_seed_all(42)
device = torch.device("cuda" if torch.cuda.is_available() else "cpu")
print(f"Using device: {device}")
def create_hypergraph(networkFile=None):
# Sample data if no file provided
data = {
"CF1":["P1","J1","L1","T1"],
"CF2":["P1","J2","L1","T2"],
"CF3":["P2","J3","L1","T3"],
"CF4":["P3","J1","L1","T4","T5"],
"CF5":["P3","J3","L1","T6"],
"CF6":["P4","J4","L1","T7"],
"CF7":["P5","J5","L1","T8","T9"],
"CF8":["P6","J5","L1","T5"],
"CF9":["P7","J5","L1","T5"],
"CF10":["P8","J1","L1","T1"],
"CF11":["P9","J5","L1","T10"],
"CF12":["P10","J6","L1","T6"],
"CF13":["P10","J5","L1","T7"],
"CF14":["P11","P12","J1","L1","T8"],
"CF15":["P13","J1","L1","T6"],
"CF16":["P13","J5","L1","T7"],
"CF17":["P13","J1","L1","T1","T2"],
"CF18":["P14","J1","L1","T9"],
"CF19":["P15","J2","L1","T10"],
"CF20":["P15","J4","L1","T11"],
"CF21":["P16","J2","L1","T12"],
"CF22":["P17","J7","L1","T13"],
"CF23":["P18","J4","L1","T14"],
"CF24":["P18","J5","L1","T15"],
"CF25":["P18","J5","L1","T16","T17"],
"CF26":["P18","J4","L1","T18"],
"CF27":["P18","J5","L1","T19"],
"CF28":["P18","J6","L1","T20"],
"CF29":["P19","J1","L1","T1"],
"CF30":["P20","J3","L1","T1"],
"CF31":["P21","J5","L2","T4"],
"CF32":["P22","J1","L1","T1"],
"CF33":["P23","J5","L1","T4"],
"CF34":["P24","J1","L1","T1"],
"CF35":["P25","J4","L3","T21"],
"CF36":["P26","J5","L1","L4","T22"],
"CF37":["P27","J5","L3","T7"],
"CF38":["P28","J1","L3","T1"],
"CF39":["P29","J4","L3","T23"],
"CF40":["P29","J2","L3","T24"],
"CF41":["P30","J1","L3","T4"],
"CF42":["P31","J4","L3","T4"],
"CF43":["P32","J1","L3","T4"],
"CF44":["P33","J4","L3","T4"]
}
text# If networkFile is provided, read data from file if networkFile: # Add code to read from file pass H = hnx.Hypergraph(data) return H
class HyperSAGNN(nn.Module):
def init(self, num_nodes, embedding_dim=64, hidden_dim=32, num_heads=4, dropout=0.1):
super(HyperSAGNN, self).init()
self.node_embedding = nn.Embedding(num_nodes, embedding_dim)
self.attention_layers = nn.ModuleList([
nn.MultiheadAttention(embed_dim=embedding_dim,
num_heads=num_heads,
dropout=dropout)
for _ in range(2) # 2 layers of self-attention
])
self.fc1 = nn.Linear(embedding_dim * 2, hidden_dim)
self.fc2 = nn.Linear(hidden_dim, 1)
self.dropout = nn.Dropout(dropout)
textdef forward(self, edges): # Get node embeddings for the edges node_i_embedding = self.node_embedding(edges[:, 0]) node_j_embedding = self.node_embedding(edges[:, 1]) # Reshape for attention (seq_len, batch, embedding_dim) x = torch.stack([node_i_embedding, node_j_embedding], dim=0) # Apply self-attention layers for attn_layer in self.attention_layers: attn_output, _ = attn_layer(x, x, x) x = x + attn_output # Residual connection # Take the attended embeddings and concatenate node_i_attended = x[0] node_j_attended = x[1] combined = torch.cat([node_i_attended, node_j_attended], dim=1) # MLP for prediction h = F.relu(self.fc1(combined)) h = self.dropout(h) logits = self.fc2(h) return torch.sigmoid(logits).squeeze()
class HyperGAT(nn.Module):
def init(self, num_nodes, embedding_dim=64, hidden_dim=32, num_heads=4, dropout=0.1):
super(HyperGAT, self).init()
self.node_embedding = nn.Embedding(num_nodes, embedding_dim)
text# GAT layers self.gat1 = GATLayer(embedding_dim, embedding_dim, num_heads, dropout) self.gat2 = GATLayer(embedding_dim, embedding_dim, num_heads, dropout) # Prediction layers self.fc1 = nn.Linear(embedding_dim * 2, hidden_dim) self.fc2 = nn.Linear(hidden_dim, 1) self.dropout = nn.Dropout(dropout) def forward(self, edges, hyperedge_indices=None): # Get node embeddings node_i_embedding = self.node_embedding(edges[:, 0]) node_j_embedding = self.node_embedding(edges[:, 1]) # Apply GAT layers if hyperedge_indices are provided if hyperedge_indices is not None: # This would require the actual hypergraph structure # For simplicity, we'll just apply the GAT on the node embeddings directly all_nodes = torch.unique(edges.flatten()) all_embeddings = self.node_embedding(all_nodes) # Create a dummy adjacency matrix for demonstration # In a real implementation, this would be derived from the hypergraph adj = torch.zeros(len(all_nodes), len(all_nodes), device=device) for i in range(len(edges)): idx1 = (all_nodes == edges[i, 0]).nonzero(as_tuple=True)[0] idx2 = (all_nodes == edges[i, 1]).nonzero(as_tuple=True)[0] adj[idx1, idx2] = 1 adj[idx2, idx1] = 1 # Apply GAT layers h = self.gat1(all_embeddings, adj) h = F.elu(h) h = self.gat2(h, adj) # Map back to original edges node_i_indices = [(all_nodes == edge[0]).nonzero(as_tuple=True)[0] for edge in edges] node_j_indices = [(all_nodes == edge[1]).nonzero(as_tuple=True)[0] for edge in edges] node_i_embedding = torch.stack([h[idx] for idx in node_i_indices]) node_j_embedding = torch.stack([h[idx] for idx in node_j_indices]) # Concatenate embeddings for prediction combined = torch.cat([node_i_embedding, node_j_embedding], dim=1) # MLP for prediction h = F.relu(self.fc1(combined)) h = self.dropout(h) logits = self.fc2(h) return torch.sigmoid(logits).squeeze()
class GATLayer(nn.Module):
def init(self, in_features, out_features, num_heads, dropout, alpha=0.2, concat=True):
super(GATLayer, self).init()
self.in_features = in_features
self.out_features = out_features
self.num_heads = num_heads
self.concat = concat
self.dropout = dropout
text# Define trainable parameters self.W = nn.Parameter(torch.empty(size=(in_features, num_heads * out_features))) self.a = nn.Parameter(torch.empty(size=(2 * out_features, 1))) self.leakyrelu = nn.LeakyReLU(alpha) self.dropout_layer = nn.Dropout(dropout) # Initialize parameters nn.init.xavier_uniform_(self.W.data) nn.init.xavier_uniform_(self.a.data) def forward(self, h, adj): batch_size = h.size(0) # Linear transformation Wh = torch.mm(h, self.W) Wh = Wh.view(batch_size, self.num_heads, self.out_features) # Repeat for attention computation Wh1 = Wh.repeat(1, batch_size, 1) Wh2 = Wh.repeat(batch_size, 1, 1) # Attention mechanism # Concatenate for attention calculation e = torch.cat([Wh1, Wh2], dim=2) e = e.view(batch_size * batch_size, self.num_heads, 2 * self.out_features) # Compute attention coefficients e = self.leakyrelu(torch.matmul(e, self.a).squeeze(2)) e = e.view(batch_size, batch_size, self.num_heads) # Mask attention for non-neighbors zero_vec = -9e15 * torch.ones_like(e) attention = torch.where(adj.unsqueeze(2).expand_as(e) > 0, e, zero_vec) attention = F.softmax(attention, dim=1) attention = self.dropout_layer(attention) # Apply attention to get output h_prime = torch.matmul(attention, Wh) # Concatenate or average attention heads if self.concat: h_prime = h_prime.view(batch_size, self.num_heads * self.out_features) else: h_prime = h_prime.mean(dim=1) return h_prime
class HypergraphLinkDataset(Dataset):
def init(self, edges, labels):
self.edges = torch.LongTensor(edges)
self.labels = torch.FloatTensor(labels)
textdef __len__(self): return len(self.edges) def __getitem__(self, idx): return self.edges[idx], self.labels[idx]
def generate_negative_samples(hyperedges, num_nodes, num_samples, existing_edges):
negative_samples = []
existing_set = set(map(tuple, existing_edges))
textwhile len(negative_samples) < num_samples: i = random.randint(0, num_nodes - 1) j = random.randint(0, num_nodes - 1) # Avoid self-loops and existing edges if i != j and (i, j) not in existing_set and (j, i) not in existing_set: negative_samples.append([i, j]) existing_set.add((i, j)) return np.array(negative_samples)
def hyperedges_to_pairwise(hyperedges):
# Get the hyperedges as a dictionary from the Hypergraph object
hyperedges_dict = hyperedges.incidence_dict
textall_nodes = [] for edge in hyperedges_dict.values(): all_nodes.extend(edge) unique_nodes = sorted(list(set(all_nodes))) # Get unique nodes and sort them node_to_id = {node: i for i, node in enumerate(unique_nodes)} pairwise_edges = [] for edge in hyperedges_dict.values(): # Iterate over hyperedge values for i in range(len(edge)): for j in range(i + 1, len(edge)): # Convert node IDs to numerical indices using node_to_id pairwise_edges.append([node_to_id[edge[i]], node_to_id[edge[j]]]) return np.array(pairwise_edges), node_to_id, len(unique_nodes)
def train_model(model, train_loader, val_loader, optimizer, num_epochs=100, patience=10, model_name="model"):
model.train()
best_val_auc = 0
patience_counter = 0
train_losses = []
val_metrics = []
textfor epoch in range(num_epochs): total_loss = 0 for edges, labels in train_loader: edges, labels = edges.to(device), labels.to(device) optimizer.zero_grad() predictions = model(edges) loss = F.binary_cross_entropy(predictions, labels) loss.backward() optimizer.step() total_loss += loss.item() # Evaluate on validation set val_auc, val_precision, val_recall, val_f1 = evaluate_model(model, val_loader) val_metrics.append([val_auc, val_precision, val_recall, val_f1]) avg_loss = total_loss / len(train_loader) train_losses.append(avg_loss) print(f"Epoch {epoch+1}/{num_epochs}, Loss: {avg_loss:.4f}, Val AUC: {val_auc:.4f}, " f"Val Precision: {val_precision:.4f}, Val Recall: {val_recall:.4f}, Val F1: {val_f1:.4f}") # Early stopping if val_auc > best_val_auc: best_val_auc = val_auc torch.save(model.state_dict(), f"best_{model_name}_model.pt") patience_counter = 0 else: patience_counter += 1 if patience_counter >= patience: print(f"Early stopping at epoch {epoch+1}") break # Load best model model.load_state_dict(torch.load(f"best_{model_name}_model.pt")) return model, train_losses, val_metrics
def evaluate_model(model, data_loader):
model.eval()
all_preds = []
all_labels = []
textwith torch.no_grad(): for edges, labels in data_loader: edges, labels = edges.to(device), labels.to(device) predictions = model(edges) all_preds.extend(predictions.cpu().numpy()) all_labels.extend(labels.cpu().numpy()) all_preds = np.array(all_preds) all_labels = np.array(all_labels) # Calculate metrics auc = roc_auc_score(all_labels, all_preds) predictions_binary = (all_preds >= 0.5).astype(int) precision = precision_score(all_labels, predictions_binary) recall = recall_score(all_labels, predictions_binary) f1 = f1_score(all_labels, predictions_binary) return auc, precision, recall, f1
def plot_roc_curve(model, test_loader, model_name="model"):
model.eval()
all_preds = []
all_labels = []
textwith torch.no_grad(): for edges, labels in test_loader: edges, labels = edges.to(device), labels.to(device) predictions = model(edges) all_preds.extend(predictions.cpu().numpy()) all_labels.extend(labels.cpu().numpy()) all_preds = np.array(all_preds) all_labels = np.array(all_labels) # Calculate ROC curve points from sklearn.metrics import roc_curve fpr, tpr, _ = roc_curve(all_labels, all_preds) auc_score = roc_auc_score(all_labels, all_preds) # Plot ROC curve plt.figure(figsize=(10, 8)) plt.plot(fpr, tpr, label=f'{model_name} (AUC = {auc_score:.3f})') plt.plot([0, 1], [0, 1], 'k--', label='Random') plt.xlabel('False Positive Rate') plt.ylabel('True Positive Rate') plt.title(f'ROC Curve for {model_name} Link Prediction') plt.legend() plt.grid(True, alpha=0.3) plt.savefig(f'{model_name.lower()}_roc_curve.png', dpi=300, bbox_inches='tight') plt.show() # Calculate other metrics and return predictions_binary = (all_preds >= 0.5).astype(int) precision = precision_score(all_labels, predictions_binary) recall = recall_score(all_labels, predictions_binary) f1 = f1_score(all_labels, predictions_binary) return auc_score, precision, recall, f1, fpr, tpr
def run_hypergraph_link_prediction(hyperedges):
# Convert hyperedges to pairwise edges
pairwise_edges, node_to_id, num_nodes = hyperedges_to_pairwise(hyperedges)
text# Generate an equal number of negative samples negative_edges = generate_negative_samples( hyperedges, num_nodes, len(pairwise_edges), pairwise_edges) # Combine positive and negative edges with labels all_edges = np.vstack([pairwise_edges, negative_edges]) labels = np.concatenate([np.ones(len(pairwise_edges)), np.zeros(len(negative_edges))]) # Split data into train, validation, and test sets edges_train, edges_temp, labels_train, labels_temp = train_test_split( all_edges, labels, test_size=0.4, random_state=42, stratify=labels) edges_val, edges_test, labels_val, labels_test = train_test_split( edges_temp, labels_temp, test_size=0.5, random_state=42, stratify=labels_temp) # Create datasets and dataloaders train_dataset = HypergraphLinkDataset(edges_train, labels_train) val_dataset = HypergraphLinkDataset(edges_val, labels_val) test_dataset = HypergraphLinkDataset(edges_test, labels_test) batch_size = 64 train_loader = DataLoader(train_dataset, batch_size=batch_size, shuffle=True) val_loader = DataLoader(val_dataset, batch_size=batch_size) test_loader = DataLoader(test_dataset, batch_size=batch_size) # Initialize models sagnn_model = HyperSAGNN(num_nodes=num_nodes).to(device) hypergat_model = HyperGAT(num_nodes=num_nodes).to(device) # Train SAGNN model print("\n=== Training Hyper-SAGNN Model ===") sagnn_optimizer = torch.optim.Adam(sagnn_model.parameters(), lr=0.001, weight_decay=1e-5) sagnn_model, sagnn_train_losses, sagnn_val_metrics = train_model( sagnn_model, train_loader, val_loader, sagnn_optimizer, num_epochs=200, patience=15, model_name="hyper_sagnn") # Train HyperGAT model print("\n=== Training HyperGAT Model ===") hypergat_optimizer = torch.optim.Adam(hypergat_model.parameters(), lr=0.001, weight_decay=1e-5) hypergat_model, hypergat_train_losses, hypergat_val_metrics = train_model( hypergat_model, train_loader, val_loader, hypergat_optimizer, num_epochs=200, patience=15, model_name="hypergat") # Evaluate SAGNN on test set print("\n=== Evaluating Hyper-SAGNN Model ===") sagnn_auc, sagnn_precision, sagnn_recall, sagnn_f1, sagnn_fpr, sagnn_tpr = plot_roc_curve( sagnn_model, test_loader, model_name="Hyper-SAGNN") # Evaluate HyperGAT on test set print("\n=== Evaluating HyperGAT Model ===") hypergat_auc, hypergat_precision, hypergat_recall, hypergat_f1, hypergat_fpr, hypergat_tpr = plot_roc_curve( hypergat_model, test_loader, model_name="HyperGAT") print("\nHyper-SAGNN Test Metrics:") print(f"AUC: {sagnn_auc:.4f}") print(f"Precision: {sagnn_precision:.4f}") print(f"Recall: {sagnn_recall:.4f}") print(f"F1 Score: {sagnn_f1:.4f}") print("\nHyperGAT Test Metrics:") print(f"AUC: {hypergat_auc:.4f}") print(f"Precision: {hypergat_precision:.4f}") print(f"Recall: {hypergat_recall:.4f}") print(f"F1 Score: {hypergat_f1:.4f}") # Plot training metrics plot_training_metrics( sagnn_train_losses, sagnn_val_metrics, hypergat_train_losses, hypergat_val_metrics) # Compare with other methods compare_methods( sagnn_auc, sagnn_precision, sagnn_recall, sagnn_f1, sagnn_fpr, sagnn_tpr, hypergat_auc, hypergat_precision, hypergat_recall, hypergat_f1, hypergat_fpr, hypergat_tpr) return sagnn_model, hypergat_model, sagnn_auc, sagnn_precision, sagnn_recall, sagnn_f1, hypergat_auc, hypergat_precision, hypergat_recall, hypergat_f1
def plot_training_metrics(sagnn_train_losses, sagnn_val_metrics, hypergat_train_losses, hypergat_val_metrics):
plt.figure(figsize=(18, 10))
text# Plot training losses plt.subplot(2, 2, 1) plt.plot(sagnn_train_losses, label='Hyper-SAGNN') plt.plot(hypergat_train_losses, label='HyperGAT') plt.title('Training Loss') plt.xlabel('Epoch') plt.ylabel('Loss') plt.legend() plt.grid(True, alpha=0.3) # Plot validation AUC plt.subplot(2, 2, 2) sagnn_val_metrics = np.array(sagnn_val_metrics) hypergat_val_metrics = np.array(hypergat_val_metrics) plt.plot(sagnn_val_metrics[:, 0], label='Hyper-SAGNN AUC') plt.plot(hypergat_val_metrics[:, 0], label='HyperGAT AUC') plt.title('Validation AUC') plt.xlabel('Epoch') plt.ylabel('AUC') plt.legend() plt.grid(True, alpha=0.3) # Plot validation Precision and Recall for SAGNN plt.subplot(2, 2, 3) plt.plot(sagnn_val_metrics[:, 1], label='Precision') plt.plot(sagnn_val_metrics[:, 2], label='Recall') plt.plot(sagnn_val_metrics[:, 3], label='F1') plt.title('Hyper-SAGNN Validation Metrics') plt.xlabel('Epoch') plt.ylabel('Score') plt.legend() plt.grid(True, alpha=0.3) # Plot validation Precision and Recall for HyperGAT plt.subplot(2, 2, 4) plt.plot(hypergat_val_metrics[:, 1], label='Precision') plt.plot(hypergat_val_metrics[:, 2], label='Recall') plt.plot(hypergat_val_metrics[:, 3], label='F1') plt.title('HyperGAT Validation Metrics') plt.xlabel('Epoch') plt.ylabel('Score') plt.legend() plt.grid(True, alpha=0.3) plt.tight_layout() plt.savefig('training_metrics_comparison.png', dpi=300, bbox_inches='tight') plt.show()
def compare_methods(sagnn_auc, sagnn_precision, sagnn_recall, sagnn_f1, sagnn_fpr, sagnn_tpr,
hypergat_auc, hypergat_precision, hypergat_recall, hypergat_f1, hypergat_fpr, hypergat_tpr):
# Previous methods' metrics
methods = [
'Common Neighbors',
'Jaccard',
'Adamic Adar',
'Preferential Attachment',
'Katz',
'Hyperlink Resource Allocation',
'Hyper-SAGNN',
'HyperGAT'
]
textaucs = [0.54, 0.30, 0.54, 0.55, 0.55, 0.57, sagnn_auc, hypergat_auc] precisions = [0.91, 1.00, 0.91, 0.84, 0.83, 1.00, sagnn_precision, hypergat_precision] recalls = [0.40, 0.02, 0.38, 0.50, 0.48, 0.40, sagnn_recall, hypergat_recall] f1_scores = [0.56, 0.04, 0.54, 0.63, 0.61, 0.58, sagnn_f1, hypergat_f1] # Plot bar chart comparison plt.figure(figsize=(16, 10)) # Create a single bar plot for all metrics x = np.arange(len(methods)) width = 0.2 # Plot all metrics in a single chart plt.bar(x - width*1.5, aucs, width, label='AUC', color='blue') plt.bar(x - width/2, precisions, width, label='Precision', color='orange') plt.bar(x + width/2, recalls, width, label='Recall', color='green') plt.bar(x + width*1.5, f1_scores, width, label='F1 Score', color='red') plt.xlabel('Methods') plt.ylabel('Score') plt.title('Metrics Comparison Across Methods') plt.xticks(x, methods, rotation=45, ha='right') plt.legend() plt.ylim(0, 1.1) plt.grid(True, alpha=0.3, linestyle='--') plt.tight_layout() plt.savefig('all_methods_comparison.png', dpi=300, bbox_inches='tight') plt.show() # Plot ROC curves comparison plt.figure(figsize=(10, 8)) # Plot ROC curves plt.plot(sagnn_fpr, sagnn_tpr, label=f'Hyper-SAGNN (AUC = {sagnn_auc:.3f})', linewidth=2) plt.plot(hypergat_fpr, hypergat_tpr, label=f'HyperGAT (AUC = {hypergat_auc:.3f})', linewidth=2) plt.plot([0, 1], [0, 1], 'k--', label='Random') plt.xlabel('False Positive Rate') plt.ylabel('True Positive Rate') plt.title('ROC Curves Comparison') plt.legend() plt.grid(True, alpha=0.3) plt.savefig('roc_curves_comparison.png', dpi=300, bbox_inches='tight') plt.show()
if name == "main":
H = create_hypergraph()
print(f"Created hypergraph with {len(H.nodes)} nodes and {len(H.edges)} hyperedges")
textresults = run_hypergraph_link_prediction(H) sagnn_model, hypergat_model, sagnn_auc, sagnn_precision, sagnn_recall, sagnn_f1, hypergat_auc, hypergat_precision, hypergat_recall, hypergat_f1 = results print("\nFinal Results Summary:") print("\nHyper-SAGNN Performance:") print(f"AUC: {sagnn_auc:.4f}") print(f"Precision: {sagnn_precision:.4f}") print(f"Recall: {sagnn_recall:.4f}") print(f"F1 Score: {sagnn_f1:.4f}") print("\nHyperGAT Performance:") print(f"AUC: {hypergat_auc:.4f}") print(f"Precision: {hypergat_precision:.4f}") print(f"Recall: {hypergat_recall:.4f}") print(f"F1 Score: {hypergat_f1:.4f}")
devo testare la bontà di un algoritmo di link prediction sull'hypergraph con gli hyperedges nascosti. 1. come input dovrei avere H e gli hyperedges da nascondere, in modo da testare l'algoritmo quanti link predice. Successivamente controllo quanti link predetti corrispondono a i link esistenti in H. quanto riesce a trovarne sul totale? 2. quali link predetti gli algoritmi hanno in comune? 3. che importanza hanno i nodi predetti corretti?
Ecco come procedere passo-passo per testare la bontà dell'algoritmo di link prediction con gli hyperedges nascosti, tenendo conto dei tuoi tre obiettivi:
Input:
H
hidden_hyperedges
Parti dall’hypergraph H
completo.
Nascondi un insieme di hyperedges (hidden_hyperedges
) creando un nuovo hypergraph ridotto H_train
:
pythonhidden_hyperedges = ["CF2", "CF5", "CF10"] # esempio di hyperedges nascosti visible_edges = {key: val for key, val in H.incidence_dict.items() if key not in hidden_hyperedges} H_train = hnx.Hypergraph(visible_edges)
Usa H_train
per allenare l’algoritmo di link prediction.
L’obiettivo è predire gli hyperedges nascosti (hidden_hyperedges
).
Dopo aver generato le previsioni, verifica quanti di questi link predetti corrispondono ai link esistenti nei hyperedges nascosti.
python# esempio di predizioni: predicted_links = [["P1","J2"], ["P3","T6"], ...] # da output del modello
pythondef hyperedges_to_pairs(hyperedges_dict): pairs = set() for edge_nodes in hyperedges_dict.values(): for pair in combinations(edge_nodes, 2): pairs.add(tuple(sorted(pair))) return pairs hidden_pairs = hyperedges_to_pairs({k: H.incidence_dict[k] for k in hidden_hyperedges})
pythonpredicted_set = set(tuple(sorted(link)) for link in predicted_links) correct_predictions = predicted_set.intersection(hidden_pairs) accuracy = len(correct_predictions) / len(hidden_pairs) print(f"Link predetti correttamente: {len(correct_predictions)} su {len(hidden_pairs)} totali.")
Se hai più algoritmi (es. HyperSAGNN e HyperGAT):
pythonpredictions_sagnn = {("P1","J2"), ("P3","T6"), ...} predictions_gat = {("P1","J2"), ("P4","T7"), ...} # intersezione delle predizioni common_predictions = predictions_sagnn.intersection(predictions_gat) print(f"Link predetti comuni tra algoritmi: {common_predictions}")
Per capire l'importanza dei nodi che sono stati correttamente predetti puoi considerare:
Esempio di analisi centralità (con HyperNetX):
pythoncorrect_nodes = set(n for link in correct_predictions for n in link) # Calcola centralità dei nodi centrality = hnx.degree_centrality(H) importance_correct_nodes = {node: centrality[node] for node in correct_nodes} sorted_importance = sorted(importance_correct_nodes.items(), key=lambda x: x[1], reverse=True) print("Importanza dei nodi correttamente predetti:") for node, importance in sorted_importance: print(f"Nodo: {node}, Centralità: {importance:.3f}")
H
per creare H_train
.H_train
e ottenere predizioni.Questa strategia ti consentirà di avere:
In questo modo, avrai una visione completa dell'affidabilità e dell'efficacia della tua metodologia di link prediction sugli hypergraph.