import pandas as pd import hypernetx as hnx import...

Criado em: 31 de julho de 2025

Respondido usando GPT-4.5 por Chat01

Pergunta

import pandas as pd
import hypernetx as hnx
import random
from itertools import combinations
import numpy as np
import matplotlib.pyplot as plt
from collections import defaultdict
import torch
import torch.nn as nn
import torch.nn.functional as F
from torch.utils.data import Dataset, DataLoader
from sklearn.metrics import roc_auc_score, precision_recall_curve, auc, precision_score, recall_score, f1_score
from sklearn.model_selection import train_test_split
import os

Set random seeds for reproducibility

random.seed(42)
np.random.seed(42)
torch.manual_seed(42)
if torch.cuda.is_available():
torch.cuda.manual_seed_all(42)

Check for GPU

device = torch.device("cuda" if torch.cuda.is_available() else "cpu")
print(f"Using device: {device}")

1. Create the hypergraph

def create_hypergraph(networkFile=None):
# Sample data if no file provided
data = {
"CF1":["P1","J1","L1","T1"],
"CF2":["P1","J2","L1","T2"],
"CF3":["P2","J3","L1","T3"],
"CF4":["P3","J1","L1","T4","T5"],
"CF5":["P3","J3","L1","T6"],
"CF6":["P4","J4","L1","T7"],
"CF7":["P5","J5","L1","T8","T9"],
"CF8":["P6","J5","L1","T5"],
"CF9":["P7","J5","L1","T5"],
"CF10":["P8","J1","L1","T1"],
"CF11":["P9","J5","L1","T10"],
"CF12":["P10","J6","L1","T6"],
"CF13":["P10","J5","L1","T7"],
"CF14":["P11","P12","J1","L1","T8"],
"CF15":["P13","J1","L1","T6"],
"CF16":["P13","J5","L1","T7"],
"CF17":["P13","J1","L1","T1","T2"],
"CF18":["P14","J1","L1","T9"],
"CF19":["P15","J2","L1","T10"],
"CF20":["P15","J4","L1","T11"],
"CF21":["P16","J2","L1","T12"],
"CF22":["P17","J7","L1","T13"],
"CF23":["P18","J4","L1","T14"],
"CF24":["P18","J5","L1","T15"],
"CF25":["P18","J5","L1","T16","T17"],
"CF26":["P18","J4","L1","T18"],
"CF27":["P18","J5","L1","T19"],
"CF28":["P18","J6","L1","T20"],
"CF29":["P19","J1","L1","T1"],
"CF30":["P20","J3","L1","T1"],
"CF31":["P21","J5","L2","T4"],
"CF32":["P22","J1","L1","T1"],
"CF33":["P23","J5","L1","T4"],
"CF34":["P24","J1","L1","T1"],
"CF35":["P25","J4","L3","T21"],
"CF36":["P26","J5","L1","L4","T22"],
"CF37":["P27","J5","L3","T7"],
"CF38":["P28","J1","L3","T1"],
"CF39":["P29","J4","L3","T23"],
"CF40":["P29","J2","L3","T24"],
"CF41":["P30","J1","L3","T4"],
"CF42":["P31","J4","L3","T4"],
"CF43":["P32","J1","L3","T4"],
"CF44":["P33","J4","L3","T4"]
}

text
# If networkFile is provided, read data from file if networkFile: # Add code to read from file pass H = hnx.Hypergraph(data) return H

Define the Hyper-SAGNN model

class HyperSAGNN(nn.Module):
def init(self, num_nodes, embedding_dim=64, hidden_dim=32, num_heads=4, dropout=0.1):
super(HyperSAGNN, self).init()
self.node_embedding = nn.Embedding(num_nodes, embedding_dim)
self.attention_layers = nn.ModuleList([
nn.MultiheadAttention(embed_dim=embedding_dim,
num_heads=num_heads,
dropout=dropout)
for _ in range(2) # 2 layers of self-attention
])
self.fc1 = nn.Linear(embedding_dim * 2, hidden_dim)
self.fc2 = nn.Linear(hidden_dim, 1)
self.dropout = nn.Dropout(dropout)

text
def forward(self, edges): # Get node embeddings for the edges node_i_embedding = self.node_embedding(edges[:, 0]) node_j_embedding = self.node_embedding(edges[:, 1]) # Reshape for attention (seq_len, batch, embedding_dim) x = torch.stack([node_i_embedding, node_j_embedding], dim=0) # Apply self-attention layers for attn_layer in self.attention_layers: attn_output, _ = attn_layer(x, x, x) x = x + attn_output # Residual connection # Take the attended embeddings and concatenate node_i_attended = x[0] node_j_attended = x[1] combined = torch.cat([node_i_attended, node_j_attended], dim=1) # MLP for prediction h = F.relu(self.fc1(combined)) h = self.dropout(h) logits = self.fc2(h) return torch.sigmoid(logits).squeeze()

Define the HyperGAT model

class HyperGAT(nn.Module):
def init(self, num_nodes, embedding_dim=64, hidden_dim=32, num_heads=4, dropout=0.1):
super(HyperGAT, self).init()
self.node_embedding = nn.Embedding(num_nodes, embedding_dim)

text
# GAT layers self.gat1 = GATLayer(embedding_dim, embedding_dim, num_heads, dropout) self.gat2 = GATLayer(embedding_dim, embedding_dim, num_heads, dropout) # Prediction layers self.fc1 = nn.Linear(embedding_dim * 2, hidden_dim) self.fc2 = nn.Linear(hidden_dim, 1) self.dropout = nn.Dropout(dropout) def forward(self, edges, hyperedge_indices=None): # Get node embeddings node_i_embedding = self.node_embedding(edges[:, 0]) node_j_embedding = self.node_embedding(edges[:, 1]) # Apply GAT layers if hyperedge_indices are provided if hyperedge_indices is not None: # This would require the actual hypergraph structure # For simplicity, we'll just apply the GAT on the node embeddings directly all_nodes = torch.unique(edges.flatten()) all_embeddings = self.node_embedding(all_nodes) # Create a dummy adjacency matrix for demonstration # In a real implementation, this would be derived from the hypergraph adj = torch.zeros(len(all_nodes), len(all_nodes), device=device) for i in range(len(edges)): idx1 = (all_nodes == edges[i, 0]).nonzero(as_tuple=True)[0] idx2 = (all_nodes == edges[i, 1]).nonzero(as_tuple=True)[0] adj[idx1, idx2] = 1 adj[idx2, idx1] = 1 # Apply GAT layers h = self.gat1(all_embeddings, adj) h = F.elu(h) h = self.gat2(h, adj) # Map back to original edges node_i_indices = [(all_nodes == edge[0]).nonzero(as_tuple=True)[0] for edge in edges] node_j_indices = [(all_nodes == edge[1]).nonzero(as_tuple=True)[0] for edge in edges] node_i_embedding = torch.stack([h[idx] for idx in node_i_indices]) node_j_embedding = torch.stack([h[idx] for idx in node_j_indices]) # Concatenate embeddings for prediction combined = torch.cat([node_i_embedding, node_j_embedding], dim=1) # MLP for prediction h = F.relu(self.fc1(combined)) h = self.dropout(h) logits = self.fc2(h) return torch.sigmoid(logits).squeeze()

GAT Layer implementation

class GATLayer(nn.Module):
def init(self, in_features, out_features, num_heads, dropout, alpha=0.2, concat=True):
super(GATLayer, self).init()
self.in_features = in_features
self.out_features = out_features
self.num_heads = num_heads
self.concat = concat
self.dropout = dropout

text
# Define trainable parameters self.W = nn.Parameter(torch.empty(size=(in_features, num_heads * out_features))) self.a = nn.Parameter(torch.empty(size=(2 * out_features, 1))) self.leakyrelu = nn.LeakyReLU(alpha) self.dropout_layer = nn.Dropout(dropout) # Initialize parameters nn.init.xavier_uniform_(self.W.data) nn.init.xavier_uniform_(self.a.data) def forward(self, h, adj): batch_size = h.size(0) # Linear transformation Wh = torch.mm(h, self.W) Wh = Wh.view(batch_size, self.num_heads, self.out_features) # Repeat for attention computation Wh1 = Wh.repeat(1, batch_size, 1) Wh2 = Wh.repeat(batch_size, 1, 1) # Attention mechanism # Concatenate for attention calculation e = torch.cat([Wh1, Wh2], dim=2) e = e.view(batch_size * batch_size, self.num_heads, 2 * self.out_features) # Compute attention coefficients e = self.leakyrelu(torch.matmul(e, self.a).squeeze(2)) e = e.view(batch_size, batch_size, self.num_heads) # Mask attention for non-neighbors zero_vec = -9e15 * torch.ones_like(e) attention = torch.where(adj.unsqueeze(2).expand_as(e) > 0, e, zero_vec) attention = F.softmax(attention, dim=1) attention = self.dropout_layer(attention) # Apply attention to get output h_prime = torch.matmul(attention, Wh) # Concatenate or average attention heads if self.concat: h_prime = h_prime.view(batch_size, self.num_heads * self.out_features) else: h_prime = h_prime.mean(dim=1) return h_prime

Dataset for hypergraph link prediction

class HypergraphLinkDataset(Dataset):
def init(self, edges, labels):
self.edges = torch.LongTensor(edges)
self.labels = torch.FloatTensor(labels)

text
def __len__(self): return len(self.edges) def __getitem__(self, idx): return self.edges[idx], self.labels[idx]

Function to generate negative samples

def generate_negative_samples(hyperedges, num_nodes, num_samples, existing_edges):
negative_samples = []
existing_set = set(map(tuple, existing_edges))

text
while len(negative_samples) < num_samples: i = random.randint(0, num_nodes - 1) j = random.randint(0, num_nodes - 1) # Avoid self-loops and existing edges if i != j and (i, j) not in existing_set and (j, i) not in existing_set: negative_samples.append([i, j]) existing_set.add((i, j)) return np.array(negative_samples)

Function to convert hyperedges to pairwise edges with numerical node IDs

def hyperedges_to_pairwise(hyperedges):
# Get the hyperedges as a dictionary from the Hypergraph object
hyperedges_dict = hyperedges.incidence_dict

text
all_nodes = [] for edge in hyperedges_dict.values(): all_nodes.extend(edge) unique_nodes = sorted(list(set(all_nodes))) # Get unique nodes and sort them node_to_id = {node: i for i, node in enumerate(unique_nodes)} pairwise_edges = [] for edge in hyperedges_dict.values(): # Iterate over hyperedge values for i in range(len(edge)): for j in range(i + 1, len(edge)): # Convert node IDs to numerical indices using node_to_id pairwise_edges.append([node_to_id[edge[i]], node_to_id[edge[j]]]) return np.array(pairwise_edges), node_to_id, len(unique_nodes)

Function to train the model

def train_model(model, train_loader, val_loader, optimizer, num_epochs=100, patience=10, model_name="model"):
model.train()
best_val_auc = 0
patience_counter = 0
train_losses = []
val_metrics = []

text
for epoch in range(num_epochs): total_loss = 0 for edges, labels in train_loader: edges, labels = edges.to(device), labels.to(device) optimizer.zero_grad() predictions = model(edges) loss = F.binary_cross_entropy(predictions, labels) loss.backward() optimizer.step() total_loss += loss.item() # Evaluate on validation set val_auc, val_precision, val_recall, val_f1 = evaluate_model(model, val_loader) val_metrics.append([val_auc, val_precision, val_recall, val_f1]) avg_loss = total_loss / len(train_loader) train_losses.append(avg_loss) print(f"Epoch {epoch+1}/{num_epochs}, Loss: {avg_loss:.4f}, Val AUC: {val_auc:.4f}, " f"Val Precision: {val_precision:.4f}, Val Recall: {val_recall:.4f}, Val F1: {val_f1:.4f}") # Early stopping if val_auc > best_val_auc: best_val_auc = val_auc torch.save(model.state_dict(), f"best_{model_name}_model.pt") patience_counter = 0 else: patience_counter += 1 if patience_counter >= patience: print(f"Early stopping at epoch {epoch+1}") break # Load best model model.load_state_dict(torch.load(f"best_{model_name}_model.pt")) return model, train_losses, val_metrics

Function to evaluate the model

def evaluate_model(model, data_loader):
model.eval()
all_preds = []
all_labels = []

text
with torch.no_grad(): for edges, labels in data_loader: edges, labels = edges.to(device), labels.to(device) predictions = model(edges) all_preds.extend(predictions.cpu().numpy()) all_labels.extend(labels.cpu().numpy()) all_preds = np.array(all_preds) all_labels = np.array(all_labels) # Calculate metrics auc = roc_auc_score(all_labels, all_preds) predictions_binary = (all_preds >= 0.5).astype(int) precision = precision_score(all_labels, predictions_binary) recall = recall_score(all_labels, predictions_binary) f1 = f1_score(all_labels, predictions_binary) return auc, precision, recall, f1

Function to plot ROC curve

def plot_roc_curve(model, test_loader, model_name="model"):
model.eval()
all_preds = []
all_labels = []

text
with torch.no_grad(): for edges, labels in test_loader: edges, labels = edges.to(device), labels.to(device) predictions = model(edges) all_preds.extend(predictions.cpu().numpy()) all_labels.extend(labels.cpu().numpy()) all_preds = np.array(all_preds) all_labels = np.array(all_labels) # Calculate ROC curve points from sklearn.metrics import roc_curve fpr, tpr, _ = roc_curve(all_labels, all_preds) auc_score = roc_auc_score(all_labels, all_preds) # Plot ROC curve plt.figure(figsize=(10, 8)) plt.plot(fpr, tpr, label=f'{model_name} (AUC = {auc_score:.3f})') plt.plot([0, 1], [0, 1], 'k--', label='Random') plt.xlabel('False Positive Rate') plt.ylabel('True Positive Rate') plt.title(f'ROC Curve for {model_name} Link Prediction') plt.legend() plt.grid(True, alpha=0.3) plt.savefig(f'{model_name.lower()}_roc_curve.png', dpi=300, bbox_inches='tight') plt.show() # Calculate other metrics and return predictions_binary = (all_preds >= 0.5).astype(int) precision = precision_score(all_labels, predictions_binary) recall = recall_score(all_labels, predictions_binary) f1 = f1_score(all_labels, predictions_binary) return auc_score, precision, recall, f1, fpr, tpr

Main function to run the link prediction

def run_hypergraph_link_prediction(hyperedges):
# Convert hyperedges to pairwise edges
pairwise_edges, node_to_id, num_nodes = hyperedges_to_pairwise(hyperedges)

text
# Generate an equal number of negative samples negative_edges = generate_negative_samples( hyperedges, num_nodes, len(pairwise_edges), pairwise_edges) # Combine positive and negative edges with labels all_edges = np.vstack([pairwise_edges, negative_edges]) labels = np.concatenate([np.ones(len(pairwise_edges)), np.zeros(len(negative_edges))]) # Split data into train, validation, and test sets edges_train, edges_temp, labels_train, labels_temp = train_test_split( all_edges, labels, test_size=0.4, random_state=42, stratify=labels) edges_val, edges_test, labels_val, labels_test = train_test_split( edges_temp, labels_temp, test_size=0.5, random_state=42, stratify=labels_temp) # Create datasets and dataloaders train_dataset = HypergraphLinkDataset(edges_train, labels_train) val_dataset = HypergraphLinkDataset(edges_val, labels_val) test_dataset = HypergraphLinkDataset(edges_test, labels_test) batch_size = 64 train_loader = DataLoader(train_dataset, batch_size=batch_size, shuffle=True) val_loader = DataLoader(val_dataset, batch_size=batch_size) test_loader = DataLoader(test_dataset, batch_size=batch_size) # Initialize models sagnn_model = HyperSAGNN(num_nodes=num_nodes).to(device) hypergat_model = HyperGAT(num_nodes=num_nodes).to(device) # Train SAGNN model print("\n=== Training Hyper-SAGNN Model ===") sagnn_optimizer = torch.optim.Adam(sagnn_model.parameters(), lr=0.001, weight_decay=1e-5) sagnn_model, sagnn_train_losses, sagnn_val_metrics = train_model( sagnn_model, train_loader, val_loader, sagnn_optimizer, num_epochs=200, patience=15, model_name="hyper_sagnn") # Train HyperGAT model print("\n=== Training HyperGAT Model ===") hypergat_optimizer = torch.optim.Adam(hypergat_model.parameters(), lr=0.001, weight_decay=1e-5) hypergat_model, hypergat_train_losses, hypergat_val_metrics = train_model( hypergat_model, train_loader, val_loader, hypergat_optimizer, num_epochs=200, patience=15, model_name="hypergat") # Evaluate SAGNN on test set print("\n=== Evaluating Hyper-SAGNN Model ===") sagnn_auc, sagnn_precision, sagnn_recall, sagnn_f1, sagnn_fpr, sagnn_tpr = plot_roc_curve( sagnn_model, test_loader, model_name="Hyper-SAGNN") # Evaluate HyperGAT on test set print("\n=== Evaluating HyperGAT Model ===") hypergat_auc, hypergat_precision, hypergat_recall, hypergat_f1, hypergat_fpr, hypergat_tpr = plot_roc_curve( hypergat_model, test_loader, model_name="HyperGAT") print("\nHyper-SAGNN Test Metrics:") print(f"AUC: {sagnn_auc:.4f}") print(f"Precision: {sagnn_precision:.4f}") print(f"Recall: {sagnn_recall:.4f}") print(f"F1 Score: {sagnn_f1:.4f}") print("\nHyperGAT Test Metrics:") print(f"AUC: {hypergat_auc:.4f}") print(f"Precision: {hypergat_precision:.4f}") print(f"Recall: {hypergat_recall:.4f}") print(f"F1 Score: {hypergat_f1:.4f}") # Plot training metrics plot_training_metrics( sagnn_train_losses, sagnn_val_metrics, hypergat_train_losses, hypergat_val_metrics) # Compare with other methods compare_methods( sagnn_auc, sagnn_precision, sagnn_recall, sagnn_f1, sagnn_fpr, sagnn_tpr, hypergat_auc, hypergat_precision, hypergat_recall, hypergat_f1, hypergat_fpr, hypergat_tpr) return sagnn_model, hypergat_model, sagnn_auc, sagnn_precision, sagnn_recall, sagnn_f1, hypergat_auc, hypergat_precision, hypergat_recall, hypergat_f1

Function to plot training metrics

def plot_training_metrics(sagnn_train_losses, sagnn_val_metrics, hypergat_train_losses, hypergat_val_metrics):
plt.figure(figsize=(18, 10))

text
# Plot training losses plt.subplot(2, 2, 1) plt.plot(sagnn_train_losses, label='Hyper-SAGNN') plt.plot(hypergat_train_losses, label='HyperGAT') plt.title('Training Loss') plt.xlabel('Epoch') plt.ylabel('Loss') plt.legend() plt.grid(True, alpha=0.3) # Plot validation AUC plt.subplot(2, 2, 2) sagnn_val_metrics = np.array(sagnn_val_metrics) hypergat_val_metrics = np.array(hypergat_val_metrics) plt.plot(sagnn_val_metrics[:, 0], label='Hyper-SAGNN AUC') plt.plot(hypergat_val_metrics[:, 0], label='HyperGAT AUC') plt.title('Validation AUC') plt.xlabel('Epoch') plt.ylabel('AUC') plt.legend() plt.grid(True, alpha=0.3) # Plot validation Precision and Recall for SAGNN plt.subplot(2, 2, 3) plt.plot(sagnn_val_metrics[:, 1], label='Precision') plt.plot(sagnn_val_metrics[:, 2], label='Recall') plt.plot(sagnn_val_metrics[:, 3], label='F1') plt.title('Hyper-SAGNN Validation Metrics') plt.xlabel('Epoch') plt.ylabel('Score') plt.legend() plt.grid(True, alpha=0.3) # Plot validation Precision and Recall for HyperGAT plt.subplot(2, 2, 4) plt.plot(hypergat_val_metrics[:, 1], label='Precision') plt.plot(hypergat_val_metrics[:, 2], label='Recall') plt.plot(hypergat_val_metrics[:, 3], label='F1') plt.title('HyperGAT Validation Metrics') plt.xlabel('Epoch') plt.ylabel('Score') plt.legend() plt.grid(True, alpha=0.3) plt.tight_layout() plt.savefig('training_metrics_comparison.png', dpi=300, bbox_inches='tight') plt.show()

Function to compare methods

def compare_methods(sagnn_auc, sagnn_precision, sagnn_recall, sagnn_f1, sagnn_fpr, sagnn_tpr,
hypergat_auc, hypergat_precision, hypergat_recall, hypergat_f1, hypergat_fpr, hypergat_tpr):
# Previous methods' metrics
methods = [
'Common Neighbors',
'Jaccard',
'Adamic Adar',
'Preferential Attachment',
'Katz',
'Hyperlink Resource Allocation',
'Hyper-SAGNN',
'HyperGAT'
]

text
aucs = [0.54, 0.30, 0.54, 0.55, 0.55, 0.57, sagnn_auc, hypergat_auc] precisions = [0.91, 1.00, 0.91, 0.84, 0.83, 1.00, sagnn_precision, hypergat_precision] recalls = [0.40, 0.02, 0.38, 0.50, 0.48, 0.40, sagnn_recall, hypergat_recall] f1_scores = [0.56, 0.04, 0.54, 0.63, 0.61, 0.58, sagnn_f1, hypergat_f1] # Plot bar chart comparison plt.figure(figsize=(16, 10)) # Create a single bar plot for all metrics x = np.arange(len(methods)) width = 0.2 # Plot all metrics in a single chart plt.bar(x - width*1.5, aucs, width, label='AUC', color='blue') plt.bar(x - width/2, precisions, width, label='Precision', color='orange') plt.bar(x + width/2, recalls, width, label='Recall', color='green') plt.bar(x + width*1.5, f1_scores, width, label='F1 Score', color='red') plt.xlabel('Methods') plt.ylabel('Score') plt.title('Metrics Comparison Across Methods') plt.xticks(x, methods, rotation=45, ha='right') plt.legend() plt.ylim(0, 1.1) plt.grid(True, alpha=0.3, linestyle='--') plt.tight_layout() plt.savefig('all_methods_comparison.png', dpi=300, bbox_inches='tight') plt.show() # Plot ROC curves comparison plt.figure(figsize=(10, 8)) # Plot ROC curves plt.plot(sagnn_fpr, sagnn_tpr, label=f'Hyper-SAGNN (AUC = {sagnn_auc:.3f})', linewidth=2) plt.plot(hypergat_fpr, hypergat_tpr, label=f'HyperGAT (AUC = {hypergat_auc:.3f})', linewidth=2) plt.plot([0, 1], [0, 1], 'k--', label='Random') plt.xlabel('False Positive Rate') plt.ylabel('True Positive Rate') plt.title('ROC Curves Comparison') plt.legend() plt.grid(True, alpha=0.3) plt.savefig('roc_curves_comparison.png', dpi=300, bbox_inches='tight') plt.show()

Main execution

if name == "main":
H = create_hypergraph()
print(f"Created hypergraph with {len(H.nodes)} nodes and {len(H.edges)} hyperedges")

text
results = run_hypergraph_link_prediction(H) sagnn_model, hypergat_model, sagnn_auc, sagnn_precision, sagnn_recall, sagnn_f1, hypergat_auc, hypergat_precision, hypergat_recall, hypergat_f1 = results print("\nFinal Results Summary:") print("\nHyper-SAGNN Performance:") print(f"AUC: {sagnn_auc:.4f}") print(f"Precision: {sagnn_precision:.4f}") print(f"Recall: {sagnn_recall:.4f}") print(f"F1 Score: {sagnn_f1:.4f}") print("\nHyperGAT Performance:") print(f"AUC: {hypergat_auc:.4f}") print(f"Precision: {hypergat_precision:.4f}") print(f"Recall: {hypergat_recall:.4f}") print(f"F1 Score: {hypergat_f1:.4f}")

devo testare la bontà di un algoritmo di link prediction sull'hypergraph con gli hyperedges nascosti. 1. come input dovrei avere H e gli hyperedges da nascondere, in modo da testare l'algoritmo quanti link predice. Successivamente controllo quanti link predetti corrispondono a i link esistenti in H. quanto riesce a trovarne sul totale? 2. quali link predetti gli algoritmi hanno in comune? 3. che importanza hanno i nodi predetti corretti?

Resposta

Ecco come procedere passo-passo per testare la bontà dell'algoritmo di link prediction con gli hyperedges nascosti, tenendo conto dei tuoi tre obiettivi:


Passaggio 1: Input e suddivisione dati

Input:

  • Hypergraph originale: H
  • Hyperedges da nascondere: hidden_hyperedges

Come procedere:

  • Parti dall’hypergraph H completo.

  • Nascondi un insieme di hyperedges (hidden_hyperedges) creando un nuovo hypergraph ridotto H_train:

    python
    hidden_hyperedges = ["CF2", "CF5", "CF10"] # esempio di hyperedges nascosti visible_edges = {key: val for key, val in H.incidence_dict.items() if key not in hidden_hyperedges} H_train = hnx.Hypergraph(visible_edges)
  • Usa H_train per allenare l’algoritmo di link prediction.

  • L’obiettivo è predire gli hyperedges nascosti (hidden_hyperedges).


Passaggio 2: Valutare quanti link nascosti l'algoritmo riesce a predire

Dopo aver generato le previsioni, verifica quanti di questi link predetti corrispondono ai link esistenti nei hyperedges nascosti.

Come procedere:

  • Predici un insieme di coppie di nodi candidate (es.: top-50 link con score più alto).
python
# esempio di predizioni: predicted_links = [["P1","J2"], ["P3","T6"], ...] # da output del modello
  • Trasforma gli hyperedges nascosti in insiemi di coppie di nodi reali:
python
def hyperedges_to_pairs(hyperedges_dict): pairs = set() for edge_nodes in hyperedges_dict.values(): for pair in combinations(edge_nodes, 2): pairs.add(tuple(sorted(pair))) return pairs hidden_pairs = hyperedges_to_pairs({k: H.incidence_dict[k] for k in hidden_hyperedges})
  • Valuta quante predizioni sono corrette:
python
predicted_set = set(tuple(sorted(link)) for link in predicted_links) correct_predictions = predicted_set.intersection(hidden_pairs) accuracy = len(correct_predictions) / len(hidden_pairs) print(f"Link predetti correttamente: {len(correct_predictions)} su {len(hidden_pairs)} totali.")

Passaggio 3: Identificare link predetti comuni tra algoritmi diversi

Se hai più algoritmi (es. HyperSAGNN e HyperGAT):

  • Ottieni i link predetti da ciascun algoritmo separatamente:
python
predictions_sagnn = {("P1","J2"), ("P3","T6"), ...} predictions_gat = {("P1","J2"), ("P4","T7"), ...} # intersezione delle predizioni common_predictions = predictions_sagnn.intersection(predictions_gat) print(f"Link predetti comuni tra algoritmi: {common_predictions}")
  • Questi link comuni rappresentano predizioni "consistenti" e potenzialmente più affidabili.

Passaggio 4: Valutare l'importanza dei nodi correttamente predetti

Per capire l'importanza dei nodi che sono stati correttamente predetti puoi considerare:

  • Centralità dei nodi: valuta la centralità (es.: degree centrality o eigenvector centrality) dei nodi correttamente predetti.
  • Significato funzionale: identifica se i nodi correttamente predetti hanno ruoli chiave nell’hypergraph originale (es. ruoli critici come hub o connettori).

Esempio di analisi centralità (con HyperNetX):

python
correct_nodes = set(n for link in correct_predictions for n in link) # Calcola centralità dei nodi centrality = hnx.degree_centrality(H) importance_correct_nodes = {node: centrality[node] for node in correct_nodes} sorted_importance = sorted(importance_correct_nodes.items(), key=lambda x: x[1], reverse=True) print("Importanza dei nodi correttamente predetti:") for node, importance in sorted_importance: print(f"Nodo: {node}, Centralità: {importance:.3f}")

Riepilogo di cosa fare concretamente nel codice:

  • Nascondere hyperedges da H per creare H_train.
  • Allenare il modello su H_train e ottenere predizioni.
  • Confrontare predizioni con hyperedges nascosti per determinare quanti link l’algoritmo predice correttamente (accuratezza).
  • Valutare intersezione delle predizioni di più algoritmi.
  • Calcolare la centralità e importanza funzionale dei nodi predetti correttamente.

Questa strategia ti consentirà di avere:

  • Misure quantitative dell'efficacia del tuo algoritmo.
  • Comprensione della robustezza delle predizioni attraverso la coerenza tra algoritmi.
  • Valutazione qualitativa dell'importanza dei nodi predetti correttamente.

In questo modo, avrai una visione completa dell'affidabilità e dell'efficacia della tua metodologia di link prediction sugli hypergraph.

Compartilhar esta Q&A