LSTM Forecasting with PyTorch

Creato il: 17 gennaio 2025

Risposto usando OpenAI 01 da OpenAI01

Domanda

i have csv file it's name is realDataForTrain.csv the first part of csv file is like this : col1,col2
-0.09049732849448675,97.07421138751958
0.01706581776673801,8.931705375213935
-0.12610981210709962,35.23775348541896
0.031167010198950122,86.54867751356406
-0.053238021550445415,22.287907143775364
0.020438444672281317,31.388636228767187
-0.01565222813575494,93.97151854250646
0.22906818845030516,69.4499608684432
-0.2611698787716817,93.64285187572544
-0.2957446866807184,18.761178121102905 the data consider 854 rows of data i need to feed this data to LSTM model use pytorch and consider i'm beginner step by step load data preprocess data to feed LSTM neural network then fit model and evaluate if needed plot for visualize data and evaluation result for evaluation i need accuracy metric beside other metric and i need my network predict 10 next value of col1, col2

Pensiero

Loading and preprocessing data

I’m working through the steps to handle LSTM for time series with PyTorch, focusing on importing data, preparing it, and defining the model.

Mapping out steps

I’m outlining the process to measure accuracy for continuous variables, which involves using metrics like MSE, RMSE, or MAE, and making predictions for the next 10 values.

Thought about LSTM model setup for 16 seconds

Risposta

Below is an end-to-end example using PyTorch to:

  1. Load the CSV data using pandas.
  2. Preprocess it (including normalization and converting to sequences).
  3. Define an LSTM model for forecasting.
  4. Train (fit) the model.
  5. Evaluate it using accuracy and other metrics (for demonstration).
  6. Predict the next 10 values of both col1 and col2.
  7. Optionally visualize the results.

Note: In real-life time series forecasting with continuous values, using accuracy (which is typical for classification) can be less meaningful. Common metrics for regression tasks include MSE, RMSE, MAE, and R^2. However, we will show how to compute an approximate accuracy alongside other metrics for illustrative purposes.


1. Install and Import Required Libraries

!pip install pandas numpy matplotlib torch
import pandas as pd import numpy as np import torch import torch.nn as nn import torch.optim as optim from sklearn.preprocessing import MinMaxScaler import matplotlib.pyplot as plt # For reproducibility torch.manual_seed(42) np.random.seed(42)

2. Load the CSV Data

Assume your file is named realDataForTrain.csv and has at least two columns: col1 and col2. The file might look like:

col1,col2
-0.09049732849448675,97.07421138751958
0.01706581776673801,8.931705375213935
...
# Load data df = pd.read_csv("realDataForTrain.csv") print(df.head()) print(f"Data shape: {df.shape}")

3. Preprocessing: Normalization and Sequence Creation

3.1 Normalize the data

For neural networks, it often helps to normalize or scale data. Below, we use MinMax scaling to bring each column into [0, 1]. You can also use StandardScaler or skip scaling, depending on your preference.

scaler = MinMaxScaler() scaled_data = scaler.fit_transform(df[['col1', 'col2']]) # shape: (num_samples, 2)

3.2 Convert to sequences

We need to create sequences of length seq_length that the LSTM can learn from. For example, if seq_length=10, we take 10 consecutive rows as the input (X), and the next row as the target (y). Here, we’ll do a multi-output approach to predict (col1, col2) at the next time step.

def create_sequences(data, seq_length=10): X = [] y = [] for i in range(len(data) - seq_length): # i to i+seq_length-1 is the input seq_x = data[i : i + seq_length] # i+seq_length is the target row seq_y = data[i + seq_length] X.append(seq_x) y.append(seq_y) return np.array(X), np.array(y) SEQ_LENGTH = 10 X, y = create_sequences(scaled_data, SEQ_LENGTH) print("Shape of X:", X.shape) # (samples, seq_length, 2) print("Shape of y:", y.shape) # (samples, 2)
  • X.shape[0] is the number of samples.
  • X.shape[1] == SEQ_LENGTH is the window size.
  • X.shape[2] == 2 because we have 2 features (col1 and col2).

3.3 Split into training and test sets

We can split the data so the model is trained on a portion (e.g., 80%) and tested on the remaining 20%. There are many ways to do time-series splits, but here is a simple approach.

train_size = int(len(X) * 0.8) X_train = X[:train_size] y_train = y[:train_size] X_test = X[train_size:] y_test = y[train_size:] print("Train samples:", X_train.shape[0]) print("Test samples :", X_test.shape[0])

3.4 Convert to PyTorch tensors

X_train_tensors = torch.tensor(X_train, dtype=torch.float32) y_train_tensors = torch.tensor(y_train, dtype=torch.float32) X_test_tensors = torch.tensor(X_test, dtype=torch.float32) y_test_tensors = torch.tensor(y_test, dtype=torch.float32) print("X_train_tensors:", X_train_tensors.shape) print("y_train_tensors:", y_train_tensors.shape)

4. Define the LSTM Model

We will define a simple LSTM for multi-output regression (predicting both col1 and col2). The key parts:

  • input_size = 2 (since we have 2 features: col1, col2).
  • hidden_size can be a hyperparameter (e.g., 32, 64).
  • num_layers is how many LSTM layers to stack.
  • output_size = 2 (predict col1 and col2).
class LSTMModel(nn.Module): def __init__(self, input_size=2, hidden_size=32, num_layers=1, output_size=2): super(LSTMModel, self).__init__() self.hidden_size = hidden_size self.num_layers = num_layers self.lstm = nn.LSTM(input_size, hidden_size, num_layers, batch_first=True) self.fc = nn.Linear(hidden_size, output_size) def forward(self, x): # x shape: (batch, seq_length, input_size) h0 = torch.zeros(self.num_layers, x.size(0), self.hidden_size) c0 = torch.zeros(self.num_layers, x.size(0), self.hidden_size) out, _ = self.lstm(x, (h0, c0)) # out: (batch, seq_length, hidden_size) out = out[:, -1, :] # take the last time step's hidden state out = self.fc(out) # shape: (batch, output_size) return out

5. Train (Fit) the Model

We will wrap our dataset in a DataLoader for mini-batch training. If the dataset is small, you could also do a single batch. Here’s a typical training loop with MSE loss.

from torch.utils.data import TensorDataset, DataLoader # Hyperparameters input_size = 2 hidden_size = 32 num_layers = 1 output_size = 2 num_epochs = 30 batch_size = 32 learning_rate = 1e-3 # Create model model = LSTMModel(input_size, hidden_size, num_layers, output_size) criterion = nn.MSELoss() # For regression optimizer = optim.Adam(model.parameters(), lr=learning_rate) # Dataset and DataLoader train_dataset = TensorDataset(X_train_tensors, y_train_tensors) train_loader = DataLoader(train_dataset, batch_size=batch_size, shuffle=True) # Training loop model.train() for epoch in range(num_epochs): for X_batch, y_batch in train_loader: optimizer.zero_grad() outputs = model(X_batch) # shape: (batch_size, 2) loss = criterion(outputs, y_batch) loss.backward() optimizer.step() if (epoch+1) % 5 == 0: print(f"Epoch [{epoch+1}/{num_epochs}], Loss: {loss.item():.6f}")

6. Evaluation

After training, we evaluate on the test set. We’ll compute:

  • MSE or RMSE to reflect how close predictions are to the true values.
  • “Accuracy” as an illustrative example. Because this is a continuous regression, we might define an accuracy threshold: if y^y<some small epsilon\| \hat{y} - y \| < \text{some small epsilon}, count it as a correct prediction.
model.eval() with torch.no_grad(): predictions = model(X_test_tensors) # shape: (test_samples, 2) # Convert back to numpy predictions = predictions.numpy() y_test_np = y_test_tensors.numpy() # Inverse scaling to get original scale predictions_inv = scaler.inverse_transform(predictions) y_test_inv = scaler.inverse_transform(y_test_np)

6.1 Compute MSE, RMSE, MAE, R^2

mse = np.mean((predictions - y_test_np)**2) rmse = np.sqrt(mse) mae = np.mean(np.abs(predictions - y_test_np)) # For R^2, we can do: ss_res = np.sum((y_test_np - predictions)**2) ss_tot = np.sum((y_test_np - np.mean(y_test_np, axis=0))**2) r2 = 1 - (ss_res / ss_tot) print(f"MSE : {mse:.6f}") print(f"RMSE: {rmse:.6f}") print(f"MAE : {mae:.6f}") print(f"R^2 : {r2:.6f}")

6.2 Approximate “accuracy” for continuous data (for demonstration)

Suppose we define an ϵ\epsilon-threshold so that if

(y1(pred)y1(true))2+(y2(pred)y2(true))2<ϵ\sqrt{(y_1^{(pred)} - y_1^{(true)})^2 + (y_2^{(pred)} - y_2^{(true)})^2} < \epsilon

then the prediction is considered “accurate.” For example, ϵ=0.1\epsilon = 0.1 in scaled space.

epsilon = 0.1 diff = np.linalg.norm(predictions - y_test_np, axis=1) # Euclidean distance accuracy = np.mean(diff < epsilon) print(f"Accuracy (with epsilon={epsilon} in scaled space): {accuracy*100:.2f}%")

7. Predict the Next 10 Values

To predict the next steps, you typically:

  1. Take the last seq_length data points from the dataset (scaled).
  2. Pass them to the model to get a prediction.
  3. Append that prediction as the “new” row to the end of your data.
  4. Repeat for the desired forecast horizon (10 steps in your request).

Below is a simplified approach that does “roll-forward” predictions.

def predict_future(model, data, scaler, seq_length=10, steps=10): """ data: full scaled dataset (np.array with shape (N, 2)) seq_length: how many past points LSTM looks at steps: how many future steps to predict """ model.eval() # Start with the last seq_length points window = data[-seq_length:].copy() # shape (seq_length, 2) predictions_future = [] for _ in range(steps): # Convert window to tensor, add batch dimension input_tensor = torch.tensor(window[np.newaxis, :, :], dtype=torch.float32) with torch.no_grad(): pred = model(input_tensor) # shape: (1, 2) pred_np = pred.numpy()[0] # shape: (2,) # Append prediction predictions_future.append(pred_np) # Roll window: drop the oldest row, and add the prediction window = np.vstack([window[1:], pred_np]) # Inverse scale the predictions predictions_future = np.array(predictions_future) predictions_future_inv = scaler.inverse_transform(predictions_future) return predictions_future_inv # Predict the next 10 values future_preds = predict_future(model, scaled_data, scaler, seq_length=SEQ_LENGTH, steps=10) print("Next 10 predictions (col1, col2):") for i, (p1, p2) in enumerate(future_preds, start=1): print(f"Step {i}: col1={p1:.4f}, col2={p2:.4f}")

8. (Optional) Visualization

You may want to compare the predicted vs. actual curves. For instance, plot the test set portion of the time series (both columns) in the original scale.

# We'll plot col1 only for demonstration, similarly you can plot col2 plt.figure(figsize=(12, 4)) # Indices for test data test_index = np.arange(train_size+SEQ_LENGTH, train_size+SEQ_LENGTH+len(y_test_inv)) plt.plot(test_index, y_test_inv[:, 0], label="Actual col1", color="blue") plt.plot(test_index, predictions_inv[:, 0], label="Predicted col1", color="red") plt.xlabel("Time (index)") plt.ylabel("col1 value") plt.title("col1 Actual vs Predicted on Test Set") plt.legend() plt.show()

Similarly, you can repeat for col2 or create multi-plot figures.


Putting It All Together

Below is a condensed script to illustrate the flow in one place:

import pandas as pd import numpy as np import torch import torch.nn as nn import torch.optim as optim from sklearn.preprocessing import MinMaxScaler from torch.utils.data import TensorDataset, DataLoader import matplotlib.pyplot as plt # 1. Load data df = pd.read_csv("realDataForTrain.csv") # 2. Normalize scaler = MinMaxScaler() scaled_data = scaler.fit_transform(df[['col1', 'col2']]) # 3. Create sequences SEQ_LENGTH = 10 def create_sequences(data, seq_length=10): X, y = [], [] for i in range(len(data) - seq_length): X.append(data[i : i + seq_length]) y.append(data[i + seq_length]) return np.array(X), np.array(y) X, y = create_sequences(scaled_data, SEQ_LENGTH) train_size = int(len(X)*0.8) X_train = X[:train_size]; y_train = y[:train_size] X_test = X[train_size:]; y_test = y[train_size:] X_train_tensors = torch.tensor(X_train, dtype=torch.float32) y_train_tensors = torch.tensor(y_train, dtype=torch.float32) X_test_tensors = torch.tensor(X_test, dtype=torch.float32) y_test_tensors = torch.tensor(y_test, dtype=torch.float32) # 4. Define LSTM class LSTMModel(nn.Module): def __init__(self, input_size=2, hidden_size=32, num_layers=1, output_size=2): super(LSTMModel, self).__init__() self.hidden_size = hidden_size self.num_layers = num_layers self.lstm = nn.LSTM(input_size, hidden_size, num_layers, batch_first=True) self.fc = nn.Linear(hidden_size, output_size) def forward(self, x): h0 = torch.zeros(self.num_layers, x.size(0), self.hidden_size) c0 = torch.zeros(self.num_layers, x.size(0), self.hidden_size) out, _ = self.lstm(x, (h0, c0)) out = out[:, -1, :] out = self.fc(out) return out # 5. Train model = LSTMModel() criterion = nn.MSELoss() optimizer = optim.Adam(model.parameters(), lr=1e-3) train_dataset = TensorDataset(X_train_tensors, y_train_tensors) train_loader = DataLoader(train_dataset, batch_size=32, shuffle=True) for epoch in range(30): for X_batch, y_batch in train_loader: optimizer.zero_grad() outputs = model(X_batch) loss = criterion(outputs, y_batch) loss.backward() optimizer.step() if (epoch+1) % 5 == 0: print(f"Epoch [{epoch+1}/30], Loss: {loss.item():.6f}") # 6. Evaluate model.eval() with torch.no_grad(): predictions = model(X_test_tensors).numpy() y_test_np = y_test_tensors.numpy() predictions_inv = scaler.inverse_transform(predictions) y_test_inv = scaler.inverse_transform(y_test_np) mse = np.mean((predictions - y_test_np)**2) rmse = np.sqrt(mse) mae = np.mean(np.abs(predictions - y_test_np)) ss_res = np.sum((y_test_np - predictions)**2) ss_tot = np.sum((y_test_np - np.mean(y_test_np, axis=0))**2) r2 = 1 - (ss_res / ss_tot) print(f"MSE : {mse:.6f}") print(f"RMSE: {rmse:.6f}") print(f"MAE : {mae:.6f}") print(f"R^2 : {r2:.6f}") # Approximate accuracy epsilon = 0.1 diff = np.linalg.norm(predictions - y_test_np, axis=1) accuracy = np.mean(diff < epsilon) print(f"Accuracy (with epsilon={epsilon} in scaled space): {accuracy*100:.2f}%") # 7. Predict the next 10 values def predict_future(model, data, scaler, seq_length=10, steps=10): model.eval() window = data[-seq_length:].copy() preds = [] for _ in range(steps): x_t = torch.tensor(window[np.newaxis, :, :], dtype=torch.float32) with torch.no_grad(): pred = model(x_t) pred_np = pred.numpy()[0] preds.append(pred_np) window = np.vstack([window[1:], pred_np]) preds = np.array(preds) return scaler.inverse_transform(preds) future_preds = predict_future(model, scaled_data, scaler, seq_length=SEQ_LENGTH, steps=10) print("Next 10 predictions (col1, col2):") for i, (p1, p2) in enumerate(future_preds, start=1): print(f"Step {i}: col1={p1:.4f}, col2={p2:.4f}") # 8. (Optional) Plot plt.figure(figsize=(12, 4)) test_index = range(train_size+SEQ_LENGTH, train_size+SEQ_LENGTH+len(y_test_inv)) plt.plot(test_index, y_test_inv[:, 0], label="Actual col1", color="blue") plt.plot(test_index, predictions_inv[:, 0], label="Predicted col1", color="red") plt.title("col1 Actual vs Predicted") plt.legend() plt.show()

Final Notes

  1. Sequence length: You can tune SEQ_LENGTH (window size) to see which works best.
  2. Hidden size, number of layers, epochs: These are hyperparameters to experiment with for better accuracy.
  3. Loss function: For time series with continuous values, MSE or MAE is common.
  4. Accuracy: For a real regression scenario, rely on RMSE/MAE/R^2 rather than classification-based accuracy.
  5. Scaling: If your data has drastically different scales, you may consider separate scalers for each column or use StandardScaler.

This step-by-step guide should get you started on training an LSTM in PyTorch for your CSV data and making multi-step forecasts. Good luck!

Condividi questa Q&A