Improving LSTM Performance
創建於:2025年1月17日
創建於:2025年1月17日
Thought about improving model accuracy for a couple of seconds
Improving a time-series model’s accuracy typically involves multiple dimensions, including data preprocessing, model architecture, hyperparameter tuning, and training strategy. Below are several suggestions you can try. You might not need to do all of them—start with small changes and see which ones most effectively improve performance.
hidden_size
from 32 to 64 or 128.num_layers=2 or 3
).However, be mindful: too large a model may overfit if you have limited data. Use a validation set or cross-validation to monitor overfitting.
Dropout: Apply dropout on the LSTM. For instance:
self.lstm = nn.LSTM(input_size, hidden_size, num_layers, batch_first=True, dropout=0.2)
(Note that if num_layers=1
, PyTorch’s LSTM ignores the dropout parameter, so consider num_layers > 1
if you want dropout to be active within the LSTM layers.)
Weight decay: Use a small weight decay in your optimizer to reduce overfitting, for example:
optimizer = optim.Adam(model.parameters(), lr=1e-3, weight_decay=1e-5)
lr=1e-3
may be too high or too low. Try different learning rates (e.g., 1e-2
, 5e-4
, 1e-4
) and see what yields the best validation loss.Right now, you’re defining accuracy in the scaled space as:
epsilon = 0.1 diff = np.linalg.norm(predictions - y_test_np, axis=1) accuracy = np.mean(diff < epsilon)
A norm difference less than 0.1 on a 2D scaled vector might be quite strict or too lenient, depending on the scale of your data:
Then average these or interpret them individually.accuracy_feature1 = np.mean(np.abs(predictions[:, 0] - y_test_np[:, 0]) < epsilon_feature1) accuracy_feature2 = np.mean(np.abs(predictions[:, 1] - y_test_np[:, 1]) < epsilon_feature2)
Below is an example of how you might modify your code in small steps.
Increase hidden size and layers, and add dropout:
class LSTMModel(nn.Module): def __init__(self, input_size=2, hidden_size=64, num_layers=2, output_size=2, dropout=0.2): super(LSTMModel, self).__init__() self.hidden_size = hidden_size self.num_layers = num_layers self.lstm = nn.LSTM(input_size, hidden_size, num_layers=num_layers, batch_first=True, dropout=dropout) self.fc = nn.Linear(hidden_size, output_size) def forward(self, x): # Initialize hidden states h0 = torch.zeros(self.num_layers, x.size(0), self.hidden_size) c0 = torch.zeros(self.num_layers, x.size(0), self.hidden_size) out, _ = self.lstm(x, (h0, c0)) # out shape: (batch, seq_len, hidden_size) out = out[:, -1, :] # only last time step out = self.fc(out) return out
Use weight decay and possibly tweak lr
:
optimizer = optim.Adam(model.parameters(), lr=5e-4, weight_decay=1e-5)
Train for more epochs or use early stopping:
best_val_loss = float('inf') for epoch in range(100): model.train() for X_batch, y_batch in train_loader: optimizer.zero_grad() outputs = model(X_batch) loss = criterion(outputs, y_batch) loss.backward() optimizer.step() # Validation step (if you have a validation set) model.eval() with torch.no_grad(): val_outputs = model(X_val_tensors) val_loss = criterion(val_outputs, y_val_tensors).item() # Early stopping check if val_loss < best_val_loss: best_val_loss = val_loss # Save best model if you like else: # If val_loss didn't improve, consider stopping or adjusting pass # Print progress if (epoch + 1) % 10 == 0: print(f"Epoch [{epoch+1}/100], Train Loss: {loss.item():.6f}, Val Loss: {val_loss:.6f}")
Consider standard scaling:
from sklearn.preprocessing import StandardScaler scaler = StandardScaler() scaled_data = scaler.fit_transform(df[['col1', 'col2']]) # Keep the rest of your code the same
Adjust your “accuracy” threshold or remove it altogether** and rely on RMSE/MAE if that is more relevant for your problem.
Incrementally applying these tips should help you improve your model’s performance and push that 17% accuracy closer to a more acceptable range.
Integrating components
I’m pulling together components like an LSTM model, GridSearch, early stopping, and a custom accuracy metric into a unified PyTorch training and evaluation script.
Crafting the function
Working on the train_and_evaluate
function to manually handle hyperparameter optimization, integrating early stopping. Addresses model creation, data loading, and defines hyperparameters like hidden_size, num_layers, and lr.
Strategizing data division
I’m piecing together a data split—70% training, 10% validation, and 20% test. This clear segmentation guides hyperparameter tuning and model training, aiming for a well-rounded evaluation.
Pulling together
I’m integrating functions for calculating accuracy and staging data. This includes the LSTM model, hidden_size, num_layers, and the dataset.
Adjusting data handling
OK, I’m tweaking the user's data approach to a 70/10/20 train-validation-test split, balancing validation set use for early stopping with the original 80/20 train-test split.
Thought about LSTM tuning with grid search and early stopping for 15 seconds
Below is a complete code example showing how you could incorporate grid search for hyperparameter tuning and early stopping into your current workflow. In this example, we do the following:
calculate_accuracy
) with a 10% threshold.Feel free to modify the splits or the hyperparameter search space to fit your exact needs.
import pandas as pd import numpy as np import torch import torch.nn as nn import torch.optim as optim from sklearn.preprocessing import MinMaxScaler from torch.utils.data import TensorDataset, DataLoader import matplotlib.pyplot as plt ############################################ # 1. Load data ############################################ df = pd.read_csv("realDataForTrain.csv") ############################################ # 2. Normalize ############################################ scaler = MinMaxScaler() scaled_data = scaler.fit_transform(df[['col1', 'col2']]) ############################################ # 3. Create sequences ############################################ SEQ_LENGTH = 10 def create_sequences(data, seq_length=10): X, y = [], [] for i in range(len(data) - seq_length): X.append(data[i : i + seq_length]) y.append(data[i + seq_length]) return np.array(X), np.array(y) X, y = create_sequences(scaled_data, SEQ_LENGTH) # Let's do 70% train, 10% validation, 20% test train_size = int(len(X) * 0.7) val_size = int(len(X) * 0.1) test_size = len(X) - train_size - val_size # Indices X_train = X[:train_size] y_train = y[:train_size] X_val = X[train_size : train_size + val_size] y_val = y[train_size : train_size + val_size] X_test = X[train_size + val_size:] y_test = y[train_size + val_size:] # Convert to tensors X_train_tensors = torch.tensor(X_train, dtype=torch.float32) y_train_tensors = torch.tensor(y_train, dtype=torch.float32) X_val_tensors = torch.tensor(X_val, dtype=torch.float32) y_val_tensors = torch.tensor(y_val, dtype=torch.float32) X_test_tensors = torch.tensor(X_test, dtype=torch.float32) y_test_tensors = torch.tensor(y_test, dtype=torch.float32) ############################################ # 4. Define LSTM model ############################################ class LSTMModel(nn.Module): def __init__(self, input_size=2, hidden_size=32, num_layers=1, output_size=2): super(LSTMModel, self).__init__() self.hidden_size = hidden_size self.num_layers = num_layers self.lstm = nn.LSTM(input_size, hidden_size, num_layers, batch_first=True) self.fc = nn.Linear(hidden_size, output_size) def forward(self, x): # Initialize hidden state and cell state h0 = torch.zeros(self.num_layers, x.size(0), self.hidden_size) c0 = torch.zeros(self.num_layers, x.size(0), self.hidden_size) out, _ = self.lstm(x, (h0, c0)) # out: (batch, seq_length, hidden_size) out = out[:, -1, :] # Take the last time step out = self.fc(out) # Linear layer return out ############################################ # 5. Accuracy calculation (10% threshold) ############################################ def calculate_accuracy(y_true, y_pred, threshold=0.10): """ Calculate accuracy based on how many predictions are within a given percentage threshold of the true values. """ # y_true and y_pred shape: (batch, 2) within_tolerance = np.abs(y_true - y_pred) <= threshold * np.abs(y_true) # within_tolerance is a boolean array of shape (batch, 2) accuracy = np.mean(within_tolerance) * 100 # Convert to percentage return accuracy ############################################ # 6. Training routine with early stopping ############################################ def train_model(model, train_loader, val_data, criterion, optimizer, max_epochs=100, patience=10): """ Train the model with early stopping on validation loss. Args: model: The PyTorch model train_loader: DataLoader for training val_data: (X_val_tensors, y_val_tensors) criterion: Loss function optimizer: Optimizer max_epochs: Maximum number of epochs patience: How many epochs to wait for improvement before stopping Returns: best_model: The best model (state) found best_val_loss: The validation loss of the best model history: A dict with training/validation loss for plotting """ X_val_tensors, y_val_tensors = val_data best_val_loss = float('inf') epochs_no_improve = 0 history = {'train_loss': [], 'val_loss': []} for epoch in range(max_epochs): # Train mode model.train() train_loss_sum = 0.0 for X_batch, y_batch in train_loader: optimizer.zero_grad() outputs = model(X_batch) loss = criterion(outputs, y_batch) loss.backward() optimizer.step() train_loss_sum += loss.item() train_loss = train_loss_sum / len(train_loader) # Validation model.eval() with torch.no_grad(): val_outputs = model(X_val_tensors) val_loss = criterion(val_outputs, y_val_tensors).item() history['train_loss'].append(train_loss) history['val_loss'].append(val_loss) # Check for improvement if val_loss < best_val_loss: best_val_loss = val_loss best_model_state = model.state_dict() # save model state epochs_no_improve = 0 else: epochs_no_improve += 1 # Early stopping if epochs_no_improve >= patience: print(f"Early stopping on epoch {epoch+1}") break if (epoch + 1) % 5 == 0: print(f"Epoch [{epoch+1}/{max_epochs}] " f"Train Loss: {train_loss:.6f}, Val Loss: {val_loss:.6f}") # Load the best model state model.load_state_dict(best_model_state) return model, best_val_loss, history ############################################ # 7. Grid search ############################################ from torch.utils.data import TensorDataset, DataLoader # Prepare train DataLoader train_dataset = TensorDataset(X_train_tensors, y_train_tensors) # Hyperparameter grid hidden_sizes = [32, 64] num_layers_list = [1, 2] learning_rates = [1e-3, 1e-4] batch_sizes = [16, 32] best_hparams = None best_val_loss = float('inf') best_model = None # We will do a simple manual grid search for hidden_size in hidden_sizes: for num_layers in num_layers_list: for lr in learning_rates: for batch_size in batch_sizes: print(f"\nTrying: hidden_size={hidden_size}, num_layers={num_layers}, lr={lr}, batch_size={batch_size}") # Create model model = LSTMModel(input_size=2, hidden_size=hidden_size, num_layers=num_layers, output_size=2) criterion = nn.MSELoss() optimizer = optim.Adam(model.parameters(), lr=lr) # Create DataLoader with the chosen batch_size train_loader = DataLoader(train_dataset, batch_size=batch_size, shuffle=True) # Train with early stopping model, val_loss, history = train_model( model=model, train_loader=train_loader, val_data=(X_val_tensors, y_val_tensors), criterion=criterion, optimizer=optimizer, max_epochs=50, # you can increase if you want patience=5 # early stopping patience ) print(f"Finished. Val Loss = {val_loss:.6f}") # Check if this is the best so far if val_loss < best_val_loss: best_val_loss = val_loss best_hparams = (hidden_size, num_layers, lr, batch_size) # Clone the model to keep it best_model = LSTMModel(input_size=2, hidden_size=hidden_size, num_layers=num_layers, output_size=2) best_model.load_state_dict(model.state_dict()) print("==> New best hyperparameters found.") print(f"\nBest val loss = {best_val_loss:.6f} with hparams: hidden_size={best_hparams[0]}, " f"num_layers={best_hparams[1]}, lr={best_hparams[2]}, batch_size={best_hparams[3]}") ############################################ # 8. Final Evaluation on Test Set ############################################ best_model.eval() with torch.no_grad(): # Predictions in scaled space predictions = best_model(X_test_tensors).numpy() y_test_np = y_test_tensors.numpy() # Inverse transform predictions and ground truth to original scale predictions_inv = scaler.inverse_transform(predictions) y_test_inv = scaler.inverse_transform(y_test_np) # Calculate standard regression metrics in scaled space mse = np.mean((predictions - y_test_np) ** 2) rmse = np.sqrt(mse) mae = np.mean(np.abs(predictions - y_test_np)) # R^2 in scaled space ss_res = np.sum((y_test_np - predictions) ** 2) ss_tot = np.sum((y_test_np - np.mean(y_test_np, axis=0)) ** 2) r2 = 1 - (ss_res / ss_tot) print("\n=== Test Set Performance (Scaled Space) ===") print(f"MSE : {mse:.6f}") print(f"RMSE: {rmse:.6f}") print(f"MAE : {mae:.6f}") print(f"R^2 : {r2:.6f}") # Calculate custom accuracy in scaled space (element-wise within 10% of y_true) accuracy_10pct = calculate_accuracy(y_test_np, predictions, threshold=0.10) print(f"Accuracy (10% threshold in scaled space): {accuracy_10pct:.2f}%") ############################################ # 9. Predict the next 10 values ############################################ def predict_future(model, data, scaler, seq_length=10, steps=10): model.eval() window = data[-seq_length:].copy() preds = [] for _ in range(steps): x_t = torch.tensor(window[np.newaxis, :, :], dtype=torch.float32) with torch.no_grad(): pred = model(x_t) pred_np = pred.numpy()[0] preds.append(pred_np) # Update the "window" window = np.vstack([window[1:], pred_np]) preds = np.array(preds) return scaler.inverse_transform(preds) future_preds = predict_future(best_model, scaled_data, scaler, seq_length=SEQ_LENGTH, steps=10) print("\nNext 10 predictions (col1, col2):") for i, (p1, p2) in enumerate(future_preds, start=1): print(f"Step {i}: col1={p1:.4f}, col2={p2:.4f}") ############################################ # 10. (Optional) Plot ############################################ plt.figure(figsize=(12, 4)) test_index = range(train_size + val_size + SEQ_LENGTH, train_size + val_size + SEQ_LENGTH + len(y_test_inv)) plt.plot(test_index, y_test_inv[:, 0], label="Actual col1", color="blue") plt.plot(test_index, predictions_inv[:, 0], label="Predicted col1", color="red") plt.title("col1 Actual vs Predicted (Test Set)") plt.legend() plt.show()
Train/Val/Test Split
We changed the split to 70/10/20, so we’d have a dedicated validation set for early stopping and hyperparameter tuning. If you want to keep your 80/20 train/test split exactly, you can carve the validation set out of the training split.
Grid Search
dropout
or different optimizers).Early Stopping
patience
parameter that stops training if the validation loss does not improve for patience
consecutive epochs.Accuracy Function (calculate_accuracy
)
col1
and col2
.Final Model on Test Set
Predict Future
predict_future
function is unchanged from your original code, but here we apply it to the best model found from grid search.Feel free to adjust any details (e.g., patience, max_epochs, or hyperparameter grid) to suit your data and to find the best performance.
Advancing strategies
I’m working through sophisticated data cleaning, scaling, feature engineering, and model architecture. Considering an accuracy metric and exploring ensemble methods for enhanced performance.
Refining the approach
I'm addressing the need for refining scaling, considering separate models for columns, and exploring dropout and architecture changes. Negative R^2 highlights the model's underperformance.
Thought about improving accuracy for 6 seconds
Below is a comprehensive list of suggestions and code modifications you can try in order to significantly boost your accuracy (with the 10% threshold definition). The main takeaways will be:
Important: Achieving an 80%+ accuracy with the threshold-based metric can be challenging depending on the data’s volatility, magnitude differences, etc. If your data varies greatly (e.g., col2 ranges up to 100, while col1 is around ±1 or so), you may need to treat each column differently (e.g., scale or transform each column separately, or compute accuracy per column). Nonetheless, let’s walk through the main steps.
Separate Scaling for Each Column
col2
is on a much different scale than col1
, using the same MinMaxScaler
for both might distort one relative to the other. Instead, consider:
scaler_col1 = MinMaxScaler() scaler_col2 = MinMaxScaler() data_col1 = df[['col1']].values data_col2 = df[['col2']].values data_col1_scaled = scaler_col1.fit_transform(data_col1) data_col2_scaled = scaler_col2.fit_transform(data_col2) # Combine them back or keep them separate scaled_data = np.hstack([data_col1_scaled, data_col2_scaled])
Check Outliers
col1
or col2
, the LSTM might “overfit” to those points. Consider removing or clipping extreme outliers if they are not valid data points.Temporal/Seasonal Features
Smoothing
To get higher accuracy, especially if your dataset is large or has complex patterns:
Use a Larger Hidden Size
hidden_size=64 or 128
.Use Multiple LSTM Layers (Stacked LSTM)
num_layers=2
or 3
can capture more complex temporal dependencies.Add Dropout
self.lstm = nn.LSTM(input_size, hidden_size, num_layers, batch_first=True, dropout=0.2)
dropout
only works if num_layers > 1
in PyTorch’s LSTM.Try GRU
GRU
can sometimes train faster and yield better performance. Simply replace nn.LSTM
with nn.GRU
(with minimal code changes).lr=1e-3
is not working, try 5e-4
, 1e-4
, or even bigger like 5e-3
.ReduceLROnPlateau
).weight_decay=1e-5
) can improve generalization.You’re using:
def calculate_accuracy(y_true, y_pred, threshold=0.10): within_tolerance = np.abs(y_true - y_pred) <= threshold * np.abs(y_true) accuracy = np.mean(within_tolerance) * 100 return accuracy
Tips:
y_true
values are near zero, the term threshold * np.abs(y_true)
becomes very small, making it almost impossible to stay within 10%.(pred - true) / (mean_of_true + 1)
).col1
and 30% for col2
or vice versa. Then you see which column is the bottleneck.Below is a refined version of the code you already have with the following changes:
col1
and col2
.patience=10
for early stopping).You can incrementally tweak these parameters (hidden_size, num_layers, dropout, lr, etc.) and see if you approach or exceed that 80% threshold accuracy.
import pandas as pd import numpy as np import torch import torch.nn as nn import torch.optim as optim from sklearn.preprocessing import MinMaxScaler from torch.utils.data import TensorDataset, DataLoader import matplotlib.pyplot as plt ############################################ # 1. Load data ############################################ df = pd.read_csv("realDataForTrain.csv") ############################################ # 2. Separate scaling for col1 and col2 ############################################ col1_data = df[['col1']].values # shape: (N, 1) col2_data = df[['col2']].values # shape: (N, 1) scaler_col1 = MinMaxScaler() scaler_col2 = MinMaxScaler() col1_scaled = scaler_col1.fit_transform(col1_data) # shape (N, 1) col2_scaled = scaler_col2.fit_transform(col2_data) # shape (N, 1) # Combine scaled columns scaled_data = np.hstack([col1_scaled, col2_scaled]) # shape (N, 2) ############################################ # 3. Create sequences ############################################ SEQ_LENGTH = 10 def create_sequences(data, seq_length=10): X, y = [], [] for i in range(len(data) - seq_length): X.append(data[i : i + seq_length]) y.append(data[i + seq_length]) # next point return np.array(X), np.array(y) X, y = create_sequences(scaled_data, SEQ_LENGTH) # We do 70% train, 10% val, 20% test train_size = int(0.7 * len(X)) val_size = int(0.1 * len(X)) test_size = len(X) - train_size - val_size X_train = X[:train_size] y_train = y[:train_size] X_val = X[train_size : train_size + val_size] y_val = y[train_size : train_size + val_size] X_test = X[train_size + val_size:] y_test = y[train_size + val_size:] # Convert to tensors X_train_tensors = torch.tensor(X_train, dtype=torch.float32) y_train_tensors = torch.tensor(y_train, dtype=torch.float32) X_val_tensors = torch.tensor(X_val, dtype=torch.float32) y_val_tensors = torch.tensor(y_val, dtype=torch.float32) X_test_tensors = torch.tensor(X_test, dtype=torch.float32) y_test_tensors = torch.tensor(y_test, dtype=torch.float32) ############################################ # 4. Define LSTM Model (Stacked + Dropout) ############################################ class LSTMModel(nn.Module): def __init__(self, input_size=2, hidden_size=64, num_layers=2, output_size=2, dropout=0.2): super(LSTMModel, self).__init__() self.hidden_size = hidden_size self.num_layers = num_layers self.lstm = nn.LSTM( input_size, hidden_size, num_layers=num_layers, batch_first=True, dropout=dropout ) self.fc = nn.Linear(hidden_size, output_size) def forward(self, x): # Initialize hidden state and cell state with zeros h0 = torch.zeros(self.num_layers, x.size(0), self.hidden_size) c0 = torch.zeros(self.num_layers, x.size(0), self.hidden_size) out, _ = self.lstm(x, (h0, c0)) # out: (batch, seq_len, hidden_size) out = out[:, -1, :] # last time step out = self.fc(out) # linear return out ############################################ # 5. Accuracy function (10% threshold) ############################################ def calculate_accuracy(y_true, y_pred, threshold=0.10): """ y_true, y_pred: shape (batch, 2) We'll calculate fraction of points where |y_pred - y_true| <= threshold * |y_true| Then convert to percentage """ within_tolerance = np.abs(y_true - y_pred) <= threshold * np.abs(y_true) accuracy = np.mean(within_tolerance) * 100 return accuracy ############################################ # 6. Early Stopping Training Routine ############################################ def train_model(model, train_loader, val_data, criterion, optimizer, max_epochs=100, patience=10): X_val_tensors, y_val_tensors = val_data best_val_loss = float('inf') epochs_no_improve = 0 for epoch in range(max_epochs): # Training model.train() train_loss_sum = 0 for X_batch, y_batch in train_loader: optimizer.zero_grad() outputs = model(X_batch) loss = criterion(outputs, y_batch) loss.backward() optimizer.step() train_loss_sum += loss.item() train_loss = train_loss_sum / len(train_loader) # Validation model.eval() with torch.no_grad(): val_outputs = model(X_val_tensors) val_loss = criterion(val_outputs, y_val_tensors).item() if val_loss < best_val_loss: best_val_loss = val_loss best_model_state = model.state_dict() epochs_no_improve = 0 else: epochs_no_improve += 1 if epochs_no_improve >= patience: print(f"Early stopping at epoch {epoch+1}") break if (epoch + 1) % 5 == 0: print(f"Epoch [{epoch+1}/{max_epochs}] - Train Loss: {train_loss:.6f}, Val Loss: {val_loss:.6f}") # Load best model model.load_state_dict(best_model_state) return model, best_val_loss ############################################ # 7. Grid Search (Optional) ############################################ from torch.utils.data import DataLoader train_dataset = TensorDataset(X_train_tensors, y_train_tensors) # Let’s do a small grid for demonstration hidden_sizes = [64, 128] # bigger hidden sizes num_layers_list = [1, 2] # stacked or not learning_rates = [1e-3, 1e-4] batch_sizes = [32, 64] dropouts = [0.0, 0.2] best_val_loss = float('inf') best_model = None best_hparams = None for hs in hidden_sizes: for nl in num_layers_list: for lr in learning_rates: for bs in batch_sizes: for dp in dropouts: print(f"\nTesting hyperparams: hs={hs}, nl={nl}, lr={lr}, bs={bs}, dropout={dp}") # Create model model = LSTMModel(input_size=2, hidden_size=hs, num_layers=nl, dropout=dp) criterion = nn.MSELoss() optimizer = optim.Adam(model.parameters(), lr=lr, weight_decay=1e-5) # Create DataLoader train_loader = DataLoader(train_dataset, batch_size=bs, shuffle=True) # Train model, val_loss = train_model( model=model, train_loader=train_loader, val_data=(X_val_tensors, y_val_tensors), criterion=criterion, optimizer=optimizer, max_epochs=100, # can increase patience=10 ) print(f"Val Loss: {val_loss:.6f}") if val_loss < best_val_loss: best_val_loss = val_loss best_hparams = (hs, nl, lr, bs, dp) # Save best model best_model = LSTMModel(input_size=2, hidden_size=hs, num_layers=nl, dropout=dp) best_model.load_state_dict(model.state_dict()) print(" ==> New best found!") print(f"\nBest val loss = {best_val_loss:.6f} with hparams: {best_hparams}") ############################################ # 8. Evaluate on Test ############################################ best_model.eval() with torch.no_grad(): test_preds_scaled = best_model(X_test_tensors).numpy() # shape (test_samples, 2) y_test_np = y_test_tensors.numpy() # shape (test_samples, 2) # Inverse transform each column separately # test_preds_scaled[:, 0] -> col1, test_preds_scaled[:, 1] -> col2 test_preds_col1 = scaler_col1.inverse_transform(test_preds_scaled[:, [0]]) test_preds_col2 = scaler_col2.inverse_transform(test_preds_scaled[:, [1]]) test_preds_inv = np.hstack([test_preds_col1, test_preds_col2]) # Same for y_test_np y_test_col1 = scaler_col1.inverse_transform(y_test_np[:, [0]]) y_test_col2 = scaler_col2.inverse_transform(y_test_np[:, [1]]) y_test_inv = np.hstack([y_test_col1, y_test_col2]) # 8.1 - MSE, RMSE, MAE in *scaled* space mse_scaled = np.mean((test_preds_scaled - y_test_np)**2) rmse_scaled = np.sqrt(mse_scaled) mae_scaled = np.mean(np.abs(test_preds_scaled - y_test_np)) # 8.2 - R^2 in scaled space ss_res = np.sum((y_test_np - test_preds_scaled)**2) ss_tot = np.sum((y_test_np - np.mean(y_test_np, axis=0))**2) r2_scaled = 1 - (ss_res / ss_tot) print("\n=== Test Set Performance (Scaled Space) ===") print(f"MSE : {mse_scaled:.6f}") print(f"RMSE: {rmse_scaled:.6f}") print(f"MAE : {mae_scaled:.6f}") print(f"R^2 : {r2_scaled:.6f}") # 8.3 - Accuracy in scaled space (10% threshold) acc_10pct = calculate_accuracy(y_test_np, test_preds_scaled, threshold=0.10) print(f"Accuracy (10% threshold, scaled space, both columns): {acc_10pct:.2f}%") # 8.4 - (Optional) Accuracy for each column separately col1_acc_10pct = calculate_accuracy(y_test_np[:, [0]], test_preds_scaled[:, [0]], threshold=0.10) col2_acc_10pct = calculate_accuracy(y_test_np[:, [1]], test_preds_scaled[:, [1]], threshold=0.10) print(f"Accuracy col1 (10% threshold): {col1_acc_10pct:.2f}%") print(f"Accuracy col2 (10% threshold): {col2_acc_10pct:.2f}%") ############################################ # 9. Predict the next 10 values ############################################ def predict_future(model, data_scaled_col1, data_scaled_col2, scaler_col1, scaler_col2, seq_length=10, steps=10): """ data_scaled_col1, data_scaled_col2 are 1D arrays of the scaled col1, col2 We'll stack them last 10 steps to feed the model. """ model.eval() # Combine them into shape (N, 2) combined_scaled = np.hstack([ data_scaled_col1.reshape(-1,1), data_scaled_col2.reshape(-1,1) ]) window = combined_scaled[-seq_length:].copy() # last seq_length rows preds = [] for _ in range(steps): x_t = torch.tensor(window[np.newaxis, :, :], dtype=torch.float32) with torch.no_grad(): pred = model(x_t).numpy()[0] # shape (2,) preds.append(pred) # Shift the window window = np.vstack([window[1:], pred]) preds = np.array(preds) # shape (steps, 2) # Inverse transform each column pred_col1_inv = scaler_col1.inverse_transform(preds[:, [0]]) pred_col2_inv = scaler_col2.inverse_transform(preds[:, [1]]) preds_inv = np.hstack([pred_col1_inv, pred_col2_inv]) return preds_inv future_preds = predict_future(best_model, col1_scaled, col2_scaled, scaler_col1, scaler_col2, seq_length=SEQ_LENGTH, steps=10) print("\nNext 10 predictions (col1, col2):") for i, (p1, p2) in enumerate(future_preds, start=1): print(f"Step {i}: col1={p1:.4f}, col2={p2:.4f}") ############################################ # 10. Plot ############################################ plt.figure(figsize=(12, 4)) test_index = range(train_size + val_size + SEQ_LENGTH, train_size + val_size + SEQ_LENGTH + len(y_test_inv)) # Plot col1 plt.plot(test_index, y_test_inv[:, 0], label="Actual col1", color="blue") plt.plot(test_index, test_preds_inv[:, 0], label="Predicted col1", color="red") plt.title("col1 Actual vs Predicted (Test Set)") plt.legend() plt.show()
Look at Per-Column Accuracy
col2
has large swings (e.g., from near 0 up to 100), the ±10%
band might be too narrow for higher accuracy. Possibly adjust the threshold for each column separately or transform col2
(e.g., log-scale if it’s always positive).Consider a Log or Other Transform
col2
is always positive, you can transform col2
with log1p(col2)
(log of col2 + 1), train the model, and then exponentiate predictions. This compresses outliers and might help the model get more consistent predictions.Feature Engineering
col2
.Try More Advanced Architectures
Check Data Integrity
Hyperparameter Tuning
Longer Training
Hitting 80% accuracy (with your threshold-based metric) may require careful data scaling, transformations, a sufficiently powerful (yet well-regularized) model, and thorough hyperparameter tuning. Start by focusing on data (scaling and outliers), and systematically tune the LSTM architecture and training scheme. Good luck!
Mapping out the refactor
I'm integrating the LSTM model for a 10-step, multi-output forecast. This involves enhancing data loading, preparation, and training components to support the refined objectives.
Refining predictions
I'm working on a multi-step forecast for 10 future timesteps across two columns. The output will either be a single 20-dimensional vector or a (batch_size, 10, 2) shape.
Shaping the trainer
I'm refining the training process for multi-step predictions by determining data splitting, accuracy metrics, and adjusting the "predict_future" function to manage sequence prediction dynamically.
Figuring out the solution
I’m moving away from the iterative "predict_future" approach, focusing instead on a simple, efficient code snippet that seamlessly integrates with the provided architecture.
Refining sequences
Crafting "create_sequences_multistep" to produce X and flattened y, training model with specified parameters, and incorporating hyperparameters for consistency.
Thought about refactoring LSTM architecture for 25 seconds
Below is one possible refactoring of your code that uses the 3-layer LSTM architecture you provided and predicts the next 10 time steps for 2 features (thus output_size = 20
). This example demonstrates:
col1
and col2
).y
) to match the 20-dimensional output from the final dense layer.Adjust any parameters (epochs, batch size, learning rate, etc.) as needed to see better performance.
import pandas as pd import numpy as np import torch import torch.nn as nn import torch.optim as optim from sklearn.preprocessing import MinMaxScaler from torch.utils.data import TensorDataset, DataLoader import matplotlib.pyplot as plt ############################################################################## # 1. Hyperparameters (from your snippet, feel free to tweak as needed) ############################################################################## input_size = 2 # Two features: (col1, col2) hidden_size = 64 # 64 hidden units in each LSTM layer num_layers = 3 # 3-layer LSTM (stacked) output_size = 20 # Predict 10 time steps * 2 features => 20 dropout = 0.5 # High dropout to reduce overfitting learning_rate = 0.005 num_epochs = 9 batch_size = 64 ############################################################################## # 2. Load Data ############################################################################## df = pd.read_csv("realDataForTrain.csv") # We'll assume your CSV has columns named exactly 'col1' and 'col2'. # If not, adjust accordingly. data = df[['col1', 'col2']].values # shape (N, 2) ############################################################################## # 3. Scaling ############################################################################## scaler = MinMaxScaler() data_scaled = scaler.fit_transform(data) # shape (N, 2) ############################################################################## # 4. Create Multi-Step Sequences ############################################################################## # We want to predict the next 10 time steps (each has 2 features). # That means our target y will be shape (10 * 2) = 20 for each sample. SEQ_LENGTH = 10 # how many past time steps we feed into LSTM FORECAST_HORIZON = 10 # how many future steps to predict def create_sequences_multistep(dataset, seq_length=10, forecast_horizon=10): X, y = [], [] # We stop at len(dataset) - seq_length - forecast_horizon + 1 for i in range(len(dataset) - seq_length - forecast_horizon + 1): # Past window past_window = dataset[i : i + seq_length] # shape (seq_length, 2) # Next horizon steps (the future we want to predict) future = dataset[i + seq_length : i + seq_length + forecast_horizon] # shape (forecast_horizon, 2) => e.g. (10, 2) # Flatten the future to match output_size=20 future_flat = future.flatten() # shape (forecast_horizon*2,) X.append(past_window) y.append(future_flat) return np.array(X), np.array(y) X, Y = create_sequences_multistep( data_scaled, seq_length=SEQ_LENGTH, forecast_horizon=FORECAST_HORIZON ) # For example, X.shape = (samples, 10, 2), Y.shape = (samples, 20) ############################################################################## # 5. Train/Val/Test Split ############################################################################## train_size = int(0.7 * len(X)) val_size = int(0.1 * len(X)) test_size = len(X) - train_size - val_size X_train = X[:train_size] y_train = Y[:train_size] X_val = X[train_size : train_size + val_size] y_val = Y[train_size : train_size + val_size] X_test = X[train_size + val_size:] y_test = Y[train_size + val_size:] # Convert to tensors X_train_tensors = torch.tensor(X_train, dtype=torch.float32) # (train_size, 10, 2) y_train_tensors = torch.tensor(y_train, dtype=torch.float32) # (train_size, 20) X_val_tensors = torch.tensor(X_val, dtype=torch.float32) y_val_tensors = torch.tensor(y_val, dtype=torch.float32) X_test_tensors = torch.tensor(X_test, dtype=torch.float32) y_test_tensors = torch.tensor(y_test, dtype=torch.float32) ############################################################################## # 6. Define the 3-Layer LSTM Model (Multi-step Output) ############################################################################## class LSTMModel(nn.Module): def __init__(self, input_size, # 2 hidden_size=50, # 50 units in each LSTM layer num_layers=3, output_size=20, # 10 time steps * 2 features dropout=0.2): super(LSTMModel, self).__init__() # First LSTM layer (returns the full sequence so the next LSTM can see all timesteps) self.lstm1 = nn.LSTM(input_size, hidden_size, batch_first=True) self.dropout1 = nn.Dropout(dropout) # Second LSTM layer (again returns the full sequence) self.lstm2 = nn.LSTM(hidden_size, hidden_size, batch_first=True) self.dropout2 = nn.Dropout(dropout) # Third LSTM layer (returns full sequence, we slice the last timestep) self.lstm3 = nn.LSTM(hidden_size, hidden_size, batch_first=True) self.dropout3 = nn.Dropout(dropout) # Final dense layer to map the last hidden state to your output size self.fc = nn.Linear(hidden_size, output_size) def forward(self, x): # x shape: (batch, seq_length=10, input_size=2) # First layer out, _ = self.lstm1(x) out = self.dropout1(out) # Second layer out, _ = self.lstm2(out) out = self.dropout2(out) # Third layer out, _ = self.lstm3(out) out = self.dropout3(out) # We only need the last timestep’s output for final prediction out = out[:, -1, :] # shape (batch, hidden_size) # Fully connected layer => shape (batch, output_size=20) out = self.fc(out) return out ############################################################################## # 7. Instantiate Model ############################################################################## model = LSTMModel( input_size=input_size, # 2 hidden_size=hidden_size, # 64 num_layers=num_layers, # 3 output_size=output_size, # 20 dropout=dropout # 0.5 ) model.to(torch.device("cpu")) # or "cuda" if you have a GPU ############################################################################## # 8. Prepare for Training ############################################################################## criterion = nn.MSELoss() # Common for regression optimizer = optim.Adam(model.parameters(), lr=learning_rate) # 0.005 train_dataset = TensorDataset(X_train_tensors, y_train_tensors) train_loader = DataLoader(train_dataset, batch_size=batch_size, shuffle=True) ############################################################################## # 9. Training Loop ############################################################################## for epoch in range(num_epochs): model.train() total_loss = 0 for X_batch, y_batch in train_loader: optimizer.zero_grad() outputs = model(X_batch) # shape (batch_size, 20) loss = criterion(outputs, y_batch) loss.backward() optimizer.step() total_loss += loss.item() avg_loss = total_loss / len(train_loader) # Optional: Validation model.eval() with torch.no_grad(): val_outputs = model(X_val_tensors) val_loss = criterion(val_outputs, y_val_tensors).item() print(f"Epoch [{epoch+1}/{num_epochs}], Train Loss: {avg_loss:.6f}, Val Loss: {val_loss:.6f}") ############################################################################## # 10. Evaluate on Test Set ############################################################################## model.eval() with torch.no_grad(): test_preds = model(X_test_tensors) # shape (test_size, 20) test_loss = criterion(test_preds, y_test_tensors).item() print(f"\nTest MSE Loss: {test_loss:.6f}") # Convert predictions back to shape (test_size, FORECAST_HORIZON=10, 2) test_preds_np = test_preds.numpy().reshape(-1, FORECAST_HORIZON, 2) y_test_np = y_test_tensors.numpy().reshape(-1, FORECAST_HORIZON, 2) # (Optional) Inverse transform each 2D time step. # Because we flattened for training, we need to invert the scaling carefully. # If you used a single scaler for both columns, we can do: test_preds_2d = test_preds_np.reshape(-1, 2) # shape (test_size * 10, 2) y_test_2d = y_test_np.reshape(-1, 2) test_preds_inv = scaler.inverse_transform(test_preds_2d) # shape (total_predictions, 2) y_test_inv = scaler.inverse_transform(y_test_2d) # Reshape back to (test_size, 10, 2) test_preds_inv = test_preds_inv.reshape(-1, FORECAST_HORIZON, 2) y_test_inv = y_test_inv.reshape(-1, FORECAST_HORIZON, 2) # Example: Plot the first test sample's 10 predicted points vs actual plt.figure(figsize=(10, 4)) plt.plot(range(1, FORECAST_HORIZON+1), y_test_inv[0, :, 0], label="Actual col1") plt.plot(range(1, FORECAST_HORIZON+1), test_preds_inv[0, :, 0], label="Pred col1") plt.title("First Test Sample, col1 (10-step forecast)") plt.legend() plt.show()
create_sequences_multistep
collects a 10-step input window (seq_length=10
) and then the next 10 steps (forecast_horizon=10
) as the target.
(10 * 2) = 20
, matching the output_size=20
.(N, 20)
back to (N, 10, 2)
to interpret them as 10 future steps of 2D data.lr=0.005
) might be high for some datasets; experiment with smaller values like 0.001 or 0.0005.num_epochs=9
) might be too few for a deep LSTM. Try 20–50+ epochs, watch for overfitting.64
) can be changed to 32 or 128, depending on dataset size.0.5
) is fairly high; sometimes 0.2–0.3 is enough. Keep an eye on validation loss.That’s it! This refactoring uses the 3-layer LSTM with dropout and a multi-step forecast output. Adjust the parameters (epochs, LR, hidden size, etc.) and keep an eye on validation metrics to refine performance.
Pulling together evaluation metrics
I’m gathering predictions and actual values, converting them to numpy arrays, reshaping them, and calculating MSE, RMSE, MAE, R^2, and accuracy with a 10% tolerance.
Thought about model evaluation metrics for 7 seconds
Below is a refactored evaluation code snippet that calculates the same metrics—MSE, RMSE, MAE, , and accuracy (with a threshold)—using the same formulas you showed in your sample. This snippet assumes:
model
,(inputs, targets)
batches,calculate_accuracy(...)
that calculates accuracy based on a threshold (like 5% or 10%).device
).Feel free to insert this code in your main script wherever you evaluate your model.
import numpy as np import torch from sklearn.metrics import mean_squared_error, mean_absolute_error, r2_score # Example accuracy function (threshold in fraction, e.g., 0.10 for 10%) def calculate_accuracy(y_true, y_pred, threshold=0.10): """ Calculate accuracy based on how many predictions are within a given percentage threshold of the true values. Note: y_true and y_pred must be the same shape. """ within_tolerance = np.abs(y_true - y_pred) <= threshold * np.abs(y_true) # Convert boolean array to percentage accuracy = np.mean(within_tolerance) * 100 return accuracy ############################################## # Evaluation on the Test Set ############################################## model.eval() # put model in evaluation mode all_predictions = [] all_actuals = [] with torch.no_grad(): for inputs, targets in test_loader: # Move data to device (CPU or GPU) inputs = inputs.to(device) targets = targets.to(device) # Generate predictions outputs = model(inputs) # shape (batch, 20) if you're predicting 10 steps * 2 features # Collect for later metric computation all_predictions.append(outputs.cpu().numpy()) all_actuals.append(targets.cpu().numpy()) # Concatenate all batches along axis=0 all_predictions = np.concatenate(all_predictions, axis=0) # shape (num_samples, 20) all_actuals = np.concatenate(all_actuals, axis=0) # shape (num_samples, 20) # If each sample is 10 time steps * 2 features => reshape to (num_samples, 10, 2) all_predictions = all_predictions.reshape(-1, 10, 2) all_actuals = all_actuals.reshape(-1, 10, 2) # Flatten them again for standard regression metrics => shape (num_samples * 10 * 2,) preds_flat = all_predictions.flatten() actuals_flat = all_actuals.flatten() # Calculate standard metrics mse = mean_squared_error(actuals_flat, preds_flat) rmse = np.sqrt(mse) mae = mean_absolute_error(actuals_flat, preds_flat) r2 = r2_score(actuals_flat, preds_flat) # Calculate accuracy with threshold (e.g., 0.10 for 10%) accuracy = calculate_accuracy(actuals_flat, preds_flat, threshold=0.10) print(f"MSE: {mse:.4f}") print(f"RMSE: {rmse:.4f}") print(f"MAE: {mae:.4f}") print(f"R^2: {r2:.4f}") print(f"Accuracy (±10%): {accuracy:.2f}%")
Gather Predictions
We loop through the test_loader
and collect the model outputs (all_predictions
) and actual values (all_actuals
) in separate lists.
Reshape
(num_samples, 20)
.(num_samples, 10, 2)
so it’s easier to interpret.sklearn.metrics
can directly handle them.Compute Metrics
threshold * abs(true_value)
of the actual value. We then convert that fraction to a percentage.Print
We print each metric to get a final overview of the model’s performance on the test set.
Note:
calculate_accuracy
threshold matches how you want to interpret being “within 5% or 10%.” If your data can be zero or very small in magnitude, you may need to handle that carefully (e.g., add a small epsilon or use a different metric).Integrating LSTM architecture
I think through the steps of incorporating the given LSTM architecture into multi-step forecasting, covering data loading, scaling, sequence creation, data splitting, model instance creation, criterion and optimizer setup, training loop, evaluation, and performance metrics.
Clarifying forecast strategy
I’m noting that the user aims for a multi-step forecast with 10 steps and suggests detailed instructions for integrating the LSTM model, including data handling, model setup, and performance evaluation.
Thought for 6 seconds
Below is a step-by-step guide on how to integrate and use your new LSTMModel
in a typical multi-step forecasting scenario. I will assume:
output_size=20
= 10 * 2
).col1
, col2
) and you want to train an LSTM to forecast the next 10 steps for both columns.Feel free to copy & paste and then edit to match your file names, variable names, or paths.
import numpy as np import pandas as pd import torch import torch.nn as nn import torch.optim as optim from sklearn.preprocessing import MinMaxScaler from torch.utils.data import TensorDataset, DataLoader import matplotlib.pyplot as plt # If you have a GPU and want to use it: device = torch.device("cuda" if torch.cuda.is_available() else "cpu") print("Using device:", device)
# Load your dataset df = pd.read_csv("realDataForTrain.csv") # Replace with your CSV path # Let's assume your CSV has 'col1' and 'col2' columns data = df[['col1', 'col2']].values # shape (N, 2)
You can use a single MinMaxScaler
for both columns. Alternatively, if columns differ widely, you can scale them separately. For now, let’s do one:
scaler = MinMaxScaler() data_scaled = scaler.fit_transform(data) # shape (N,2)
We’ll define a function to create sequences of length SEQ_LENGTH
(past) and then to forecast the next 10 steps. Because you want 10 steps * 2 features = 20 outputs, we flatten the future window into a shape (20,)
.
Note: If you only want a single-step forecast, you’d change forecast_horizon
. But your code suggests you want 10 steps.
SEQ_LENGTH = 10 FORECAST_HORIZON = 10 # the next 10 steps to predict => output_size=20 def create_sequences_multistep(dataset, seq_length=10, forecast_horizon=10): X, y = [], [] for i in range(len(dataset) - seq_length - forecast_horizon + 1): # Past window (shape: (seq_length, 2)) past_window = dataset[i : i + seq_length] # Future window => next 10 steps (shape: (forecast_horizon, 2)) future = dataset[i + seq_length : i + seq_length + forecast_horizon] # Flatten future from shape (10, 2) to (10*2=20,) future_flat = future.flatten() X.append(past_window) y.append(future_flat) return np.array(X), np.array(y) X, Y = create_sequences_multistep( data_scaled, seq_length=SEQ_LENGTH, forecast_horizon=FORECAST_HORIZON ) print("X shape:", X.shape) # e.g. (num_samples, 10, 2) print("Y shape:", Y.shape) # e.g. (num_samples, 20)
We can do a simple 70%/10%/20% split:
train_size = int(len(X)*0.7) val_size = int(len(X)*0.1) test_size = len(X) - train_size - val_size X_train = X[:train_size] Y_train = Y[:train_size] X_val = X[train_size : train_size + val_size] Y_val = Y[train_size : train_size + val_size] X_test = X[train_size + val_size:] Y_test = Y[train_size + val_size:]
X_train_tensors = torch.tensor(X_train, dtype=torch.float32).to(device) Y_train_tensors = torch.tensor(Y_train, dtype=torch.float32).to(device) X_val_tensors = torch.tensor(X_val, dtype=torch.float32).to(device) Y_val_tensors = torch.tensor(Y_val, dtype=torch.float32).to(device) X_test_tensors = torch.tensor(X_test, dtype=torch.float32).to(device) Y_test_tensors = torch.tensor(Y_test, dtype=torch.float32).to(device)
DataLoader
for TrainingSince you want to train in batches, define a TensorDataset
for train and then create a DataLoader
:
from torch.utils.data import TensorDataset, DataLoader train_dataset = TensorDataset(X_train_tensors, Y_train_tensors) batch_size = 64 # as in your code train_loader = DataLoader(train_dataset, batch_size=batch_size, shuffle=True)
(Optionally, you can also define a validation or test loader for iterating in mini-batches, but it’s often okay to just do an entire pass in evaluation mode for val/test.)
You already have the architecture:
class LSTMModel(nn.Module): def __init__(self, input_size, hidden_size, num_layers, output_size, dropout=0.5): super(LSTMModel, self).__init__() # LSTM Layer self.lstm = nn.LSTM(input_size, hidden_size, num_layers, batch_first=True, dropout=dropout) # Dropout layer self.dropout = nn.Dropout(0.2) # Dense Layer self.dense = nn.Linear(hidden_size, 256) # Dense layer with 256 units self.output = nn.Linear(256, output_size) # Output layer => final prediction def forward(self, x): out, _ = self.lstm(x) out = self.dropout(out) out = self.dense(out[:, -1, :]) # take only the last time step out = self.output(out) return out # Hyperparameters input_size = 2 # (col1, col2) hidden_size = 64 num_layers = 3 output_size = 20 # 10 time steps * 2 features dropout = 0.5 learning_rate = 0.005 # or 0.001 num_epochs = 22
Create an instance of the model and move it to device
:
model = LSTMModel( input_size=input_size, hidden_size=hidden_size, num_layers=num_layers, output_size=output_size, dropout=dropout ).to(device)
criterion = nn.MSELoss() # or nn.SmoothL1Loss(), etc. optimizer = optim.Adam(model.parameters(), lr=learning_rate)
We’ll do a simple loop for num_epochs
epochs. You can incorporate early stopping or validation checks if you like.
for epoch in range(num_epochs): model.train() total_loss = 0.0 for X_batch, Y_batch in train_loader: # (X_batch, Y_batch) are already on device if you used .to(device) earlier optimizer.zero_grad() # Forward pass outputs = model(X_batch) # shape (batch_size, 20) # Compute loss loss = criterion(outputs, Y_batch) # Backprop loss.backward() optimizer.step() total_loss += loss.item() avg_train_loss = total_loss / len(train_loader) # Optionally, check validation loss model.eval() with torch.no_grad(): val_outputs = model(X_val_tensors) val_loss = criterion(val_outputs, Y_val_tensors).item() print(f"Epoch [{epoch+1}/{num_epochs}], " f"Train Loss: {avg_train_loss:.6f}, " f"Val Loss: {val_loss:.6f}")
(num_samples, 10, 2)
so you can interpret them as 10-step forecasts.scaler
.DataLoader
(Optional)test_dataset = TensorDataset(X_test_tensors, Y_test_tensors) test_loader = DataLoader(test_dataset, batch_size=batch_size, shuffle=False)
import numpy as np from sklearn.metrics import mean_squared_error, mean_absolute_error, r2_score def calculate_accuracy(y_true, y_pred, threshold=0.10): """ e.g., threshold=0.10 for 10% tolerance y_true, y_pred: shape (n,) flattened arrays """ within_tolerance = np.abs(y_true - y_pred) <= threshold * np.abs(y_true) return np.mean(within_tolerance) * 100 model.eval() all_predictions = [] all_actuals = [] with torch.no_grad(): for inputs, targets in test_loader: inputs, targets = inputs.to(device), targets.to(device) # Generate predictions outputs = model(inputs) # shape (batch, 20) all_predictions.append(outputs.cpu().numpy()) all_actuals.append(targets.cpu().numpy()) # Concatenate to get full arrays all_predictions = np.concatenate(all_predictions, axis=0) # shape (N, 20) all_actuals = np.concatenate(all_actuals, axis=0) # shape (N, 20) # Reshape to (N, 10, 2) all_predictions = all_predictions.reshape(-1, 10, 2) all_actuals = all_actuals.reshape(-1, 10, 2) # Flatten to compute metrics => shape (N * 10 * 2,) preds_flat = all_predictions.flatten() actuals_flat = all_actuals.flatten() mse = mean_squared_error(actuals_flat, preds_flat) rmse = np.sqrt(mse) mae = mean_absolute_error(actuals_flat, preds_flat) r2 = r2_score(actuals_flat, preds_flat) acc = calculate_accuracy(actuals_flat, preds_flat, threshold=0.10) # 10% threshold print(f"MSE : {mse:.4f}") print(f"RMSE: {rmse:.4f}") print(f"MAE : {mae:.4f}") print(f"R^2 : {r2:.4f}") print(f"Accuracy (±10%): {acc:.2f}%")
If you want metrics in original scale, you must:
all_predictions
to (N * 10, 2)
.scaler.inverse_transform(...)
.all_actuals
also inverse-transformed.Example:
# Flatten from (N, 10, 2) to (N*10, 2) preds_2d = all_predictions.reshape(-1, 2) acts_2d = all_actuals.reshape(-1, 2) preds_inv = scaler.inverse_transform(preds_2d) acts_inv = scaler.inverse_transform(acts_2d) # Now flatten for metrics preds_inv_flat = preds_inv.flatten() acts_inv_flat = acts_inv.flatten() mse_inv = mean_squared_error(acts_inv_flat, preds_inv_flat) rmse_inv = np.sqrt(mse_inv) mae_inv = mean_absolute_error(acts_inv_flat, preds_inv_flat) r2_inv = r2_score(acts_inv_flat, preds_inv_flat) acc_inv = calculate_accuracy(acts_inv_flat, preds_inv_flat, threshold=0.10) print("\n=== Metrics on Original Scale ===") print(f"MSE : {mse_inv:.4f}") print(f"RMSE: {rmse_inv:.4f}") print(f"MAE : {mae_inv:.4f}") print(f"R^2 : {r2_inv:.4f}") print(f"Accuracy (±10%): {acc_inv:.2f}%")
If you want to predict the next 10 steps after your dataset ends, you can take the last SEQ_LENGTH rows of your scaled data and run them through the model:
def predict_future(model, data_scaled, seq_length=10, forecast_horizon=10): """ data_scaled: shape (N, 2) Take last seq_length steps as input and produce the next 10-step forecast. """ model.eval() window = data_scaled[-seq_length:] # shape (10,2) # Convert to (1, seq_length, 2) x_t = torch.tensor(window[np.newaxis, :, :], dtype=torch.float32).to(device) with torch.no_grad(): preds = model(x_t) # shape (1, 20) # Reshape to (10,2) preds = preds.cpu().numpy().reshape(-1, 2) return preds future_preds_scaled = predict_future(model, data_scaled, seq_length=10, forecast_horizon=10) # Inverse transform to original scale future_preds = scaler.inverse_transform(future_preds_scaled) print("Next 10 predictions (col1, col2):\n", future_preds)
Below is a condensed version of the code that includes all the key steps. You can copy, paste, and then customize:
import numpy as np import pandas as pd import torch import torch.nn as nn import torch.optim as optim from sklearn.preprocessing import MinMaxScaler from torch.utils.data import TensorDataset, DataLoader from sklearn.metrics import mean_squared_error, mean_absolute_error, r2_score import matplotlib.pyplot as plt device = torch.device("cuda" if torch.cuda.is_available() else "cpu") ############################################################################## # 1. Load Data ############################################################################## df = pd.read_csv("realDataForTrain.csv") data = df[['col1', 'col2']].values # shape (N, 2) ############################################################################## # 2. Scaling ############################################################################## scaler = MinMaxScaler() data_scaled = scaler.fit_transform(data) ############################################################################## # 3. Sequence Creation ############################################################################## SEQ_LENGTH = 10 FORECAST_HORIZON = 10 # Next 10 steps => output_size=20 def create_sequences_multistep(dataset, seq_length=10, forecast_horizon=10): X, y = [], [] for i in range(len(dataset) - seq_length - forecast_horizon + 1): past_window = dataset[i : i + seq_length] future = dataset[i + seq_length : i + seq_length + forecast_horizon] future_flat = future.flatten() X.append(past_window) y.append(future_flat) return np.array(X), np.array(y) X, Y = create_sequences_multistep(data_scaled, SEQ_LENGTH, FORECAST_HORIZON) ############################################################################## # 4. Train/Val/Test Split ############################################################################## train_size = int(0.7 * len(X)) val_size = int(0.1 * len(X)) test_size = len(X) - train_size - val_size X_train, Y_train = X[:train_size], Y[:train_size] X_val, Y_val = X[train_size: train_size+val_size], Y[train_size: train_size+val_size] X_test, Y_test = X[train_size+val_size:], Y[train_size+val_size:] ############################################################################## # 5. Convert to Tensors ############################################################################## X_train_tensors = torch.tensor(X_train, dtype=torch.float32).to(device) Y_train_tensors = torch.tensor(Y_train, dtype=torch.float32).to(device) X_val_tensors = torch.tensor(X_val, dtype=torch.float32).to(device) Y_val_tensors = torch.tensor(Y_val, dtype=torch.float32).to(device) X_test_tensors = torch.tensor(X_test, dtype=torch.float32).to(device) Y_test_tensors = torch.tensor(Y_test, dtype=torch.float32).to(device) ############################################################################## # 6. DataLoader for Training ############################################################################## train_dataset = TensorDataset(X_train_tensors, Y_train_tensors) batch_size = 64 train_loader = DataLoader(train_dataset, batch_size=batch_size, shuffle=True) ############################################################################## # 7. Define the Model ############################################################################## class LSTMModel(nn.Module): def __init__(self, input_size, hidden_size, num_layers, output_size, dropout=0.5): super(LSTMModel, self).__init__() self.lstm = nn.LSTM(input_size, hidden_size, num_layers, batch_first=True, dropout=dropout) self.dropout = nn.Dropout(0.2) self.dense = nn.Linear(hidden_size, 256) self.output = nn.Linear(256, output_size) def forward(self, x): out, _ = self.lstm(x) out = self.dropout(out) out = self.dense(out[:, -1, :]) out = self.output(out) return out input_size = 2 hidden_size = 64 num_layers = 3 output_size = 20 # 10 steps * 2 features dropout = 0.5 learning_rate = 0.005 num_epochs = 22 model = LSTMModel(input_size, hidden_size, num_layers, output_size, dropout=dropout).to(device) ############################################################################## # 8. Define Loss & Optimizer ############################################################################## criterion = nn.MSELoss() optimizer = optim.Adam(model.parameters(), lr=learning_rate) ############################################################################## # 9. Training Loop ############################################################################## for epoch in range(num_epochs): model.train() total_loss = 0.0 for X_batch, Y_batch in train_loader: optimizer.zero_grad() outputs = model(X_batch) # (batch_size, 20) loss = criterion(outputs, Y_batch) loss.backward() optimizer.step() total_loss += loss.item() avg_train_loss = total_loss / len(train_loader) # Validation model.eval() with torch.no_grad(): val_outputs = model(X_val_tensors) val_loss = criterion(val_outputs, Y_val_tensors).item() print(f"Epoch [{epoch+1}/{num_epochs}], " f"Train Loss: {avg_train_loss:.6f}, " f"Val Loss: {val_loss:.6f}") ############################################################################## # 10. Evaluate on Test ############################################################################## test_dataset = TensorDataset(X_test_tensors, Y_test_tensors) test_loader = DataLoader(test_dataset, batch_size=batch_size, shuffle=False) def calculate_accuracy(y_true, y_pred, threshold=0.10): within_tolerance = np.abs(y_true - y_pred) <= threshold * np.abs(y_true) return np.mean(within_tolerance) * 100 model.eval() all_predictions = [] all_actuals = [] with torch.no_grad(): for inputs, targets in test_loader: inputs, targets = inputs.to(device), targets.to(device) outputs = model(inputs) all_predictions.append(outputs.cpu().numpy()) all_actuals.append(targets.cpu().numpy()) all_predictions = np.concatenate(all_predictions, axis=0) # (N, 20) all_actuals = np.concatenate(all_actuals, axis=0) # (N, 20) # Reshape to (N, 10, 2) all_predictions = all_predictions.reshape(-1, 10, 2) all_actuals = all_actuals.reshape(-1, 10, 2) preds_flat = all_predictions.flatten() actuals_flat = all_actuals.flatten() mse = mean_squared_error(actuals_flat, preds_flat) rmse = np.sqrt(mse) mae = mean_absolute_error(actuals_flat, preds_flat) r2 = r2_score(actuals_flat, preds_flat) acc = calculate_accuracy(actuals_flat, preds_flat, threshold=0.10) print(f"\n=== Test Set Metrics (Scaled Space) ===") print(f"MSE: {mse:.6f}") print(f"RMSE: {rmse:.6f}") print(f"MAE: {mae:.6f}") print(f"R^2: {r2:.6f}") print(f"Accuracy (±10%): {acc:.2f}%")
Architecture: You replaced your older LSTM code with the new LSTMModel
that uses a single LSTM layer (with num_layers=3
internally), plus a dropout, a dense layer, and an output layer of size output_size=20
.
Data: Created a multi-step (10-step) forecast problem by flattening the future window.
Training: Standard PyTorch loop with nn.MSELoss()
and Adam optimizer. The final layer of your LSTM code returns shape (batch_size, 20)
, matching the flattened 10-step * 2 features.
Evaluation: Collected predictions from the test loader, reshaped them to (N, 10, 2)
, and computed error metrics.
Optional: Inverse scale predictions if you want metrics in the original units.
With these steps, you’re now using the new LSTM architecture in your existing pipeline, and you have a clear, step-by-step blueprint for how it all connects.
Thought for a second
Below is a simple Python function you can use to calculate accuracy for an LSTM (or any regression model) based on a 10% threshold. It assumes your targets and predictions are NumPy arrays of the same shape:
import numpy as np def calculate_accuracy(y_true, y_pred, threshold=0.10): """ Calculate accuracy as the percentage of predictions where the absolute error is within 'threshold' times the absolute value of the true target. Parameters ---------- y_true : np.ndarray Ground-truth values. y_pred : np.ndarray Model predictions, same shape as y_true. threshold : float, optional (default=0.10) The percentage threshold (e.g., 0.10 for 10%). Returns ------- accuracy : float Accuracy percentage (0 to 100). """ # Ensure both arrays are flattened or the same shape y_true_flat = y_true.flatten() y_pred_flat = y_pred.flatten() # Avoid division by zero by also checking if y_true is zero # (you can decide how to handle points where y_true is close to 0) within_tolerance = np.abs(y_true_flat - y_pred_flat) <= threshold * np.abs(y_true_flat) # Convert the fraction to a percentage accuracy = np.mean(within_tolerance) * 100 return accuracy
# Suppose you have these arrays (e.g., after model evaluation) y_true = np.array([1.0, 2.0, 3.0, 4.0]) y_pred = np.array([1.05, 2.2, 2.7, 4.1]) acc_10pct = calculate_accuracy(y_true, y_pred, threshold=0.10) print(f"Accuracy with ±10% tolerance: {acc_10pct:.2f}%")