LSTM Data Preprocessing and Fit
Créé le : 17 janvier 2025
Créé le : 17 janvier 2025
Loading and preparing data
I’m gathering the dataset into Pandas, focusing on col1 and col2. Then, I’m scaling and shifting the data, splitting it into training and validation sets, and reshaping it into a 3D format for the LSTM model.
Shaping and scaling
I’m outlining the steps to prepare the data: creating sequences, scaling, and splitting into X and Y arrays. X will capture the window of data, and Y will predict the next steps.
Thought about LSTM data preprocessing for 5 seconds
Below is a step-by-step guide (with example code) on how you can preprocess a 2-feature time series for predicting the next 10 steps with an LSTM, then build, train, and evaluate the model in TensorFlow/Keras. Adjust window sizes, hyperparameters, etc., as needed for your specific use case.
import numpy as np import pandas as pd from sklearn.preprocessing import MinMaxScaler from tensorflow.keras.models import Sequential from tensorflow.keras.layers import LSTM, Dense, Dropout from tensorflow.keras.optimizers import Adam
# Assume you already have: # data = pd.read_csv('realDataForTrain.csv', skiprows=1, names=['col1', 'col2']) print(data.info()) print(data.head())
You have two columns:
col1
in [-100, 100]col2
in [0, 100]LSTMs often perform better when data is scaled (e.g., between 0 and 1). Because your features have different ranges, a good practice is to scale them using a MinMaxScaler
.
scaler = MinMaxScaler(feature_range=(0, 1)) scaled_data = scaler.fit_transform(data[['col1', 'col2']]) print(f"Scaled shape: {scaled_data.shape}") # scaled_data is now a NumPy array with shape (853, 2)
We want to predict the next 10 steps given a certain window of prior data (for example, 20 timesteps of history).
Here is a function that, given a 2D array (scaled_data
), builds input sequences (X
) and corresponding labels (y
).
def create_sequences(dataset, window_size=20, horizon=10): """ dataset: scaled dataset of shape (num_samples, num_features) window_size: how many past timesteps to include horizon: how many future timesteps to predict """ X, y = [], [] # We stop at len(dataset) - window_size - horizon # so we don't go out of bounds. for i in range(len(dataset) - window_size - horizon + 1): seq_x = dataset[i : i + window_size] seq_y = dataset[i + window_size : i + window_size + horizon] X.append(seq_x) y.append(seq_y) return np.array(X), np.array(y) window_size = 20 horizon = 10 X, y = create_sequences(scaled_data, window_size=window_size, horizon=horizon) print("X shape:", X.shape) # (num_samples, window_size, num_features) print("y shape:", y.shape) # (num_samples, horizon, num_features)
X.shape
would look like (853 - 20 - 10 + 1, 20, 2)
= (824, 20, 2)
y.shape
would look like (824, 10, 2)
Let’s split our sequences into a train set and a test set. You can also consider having a separate validation set if desired. For simplicity, let’s do an 80%-train / 20%-test split.
train_size = int(len(X) * 0.8) X_train = X[:train_size] y_train = y[:train_size] X_test = X[train_size:] y_test = y[train_size:] print("Train set:", X_train.shape, y_train.shape) print("Test set:", X_test.shape, y_test.shape)
We need an LSTM that outputs horizon * num_features
values if we want to predict all timesteps at once. Alternatively, we can output (horizon, num_features)
by using an LSTM layer with return_sequences=True
and additional time-distributed layers.
Below is one approach:
units = horizon * num_features
.We’ll create a simple stacked LSTM that outputs horizon * 2
units (because we have 2 features and want 10 timesteps → 10 * 2 = 20
outputs).
num_features = X.shape[2] # should be 2 model = Sequential() model.add(LSTM(64, return_sequences=False, input_shape=(window_size, num_features))) model.add(Dropout(0.2)) model.add(Dense(horizon * num_features)) # 10 timesteps * 2 features = 20 outputs model.compile(loss='mse', optimizer=Adam(learning_rate=0.001), metrics=['mse']) model.summary()
Note: If you want the model to output a 3D shape directly (i.e., shape (horizon, num_features)
), you’d do something more advanced (like return_sequences=True
and a TimeDistributed Dense). But for a simpler single Dense output, we flatten to 20 units.
Because our y_train
shape is (samples, horizon, num_features)
, we must reshape it to (samples, horizon * num_features)
to match the model’s final Dense(horizon * num_features)
layer.
y_train_reshaped = y_train.reshape(y_train.shape[0], horizon * num_features) y_test_reshaped = y_test.reshape(y_test.shape[0], horizon * num_features) history = model.fit( X_train, y_train_reshaped, validation_data=(X_test, y_test_reshaped), epochs=50, # Adjust epochs as needed batch_size=32, # Adjust batch_size as needed verbose=1 )
To evaluate, we can look at the final loss on the test set, or we can get predictions and compare them to actual values. Let’s compute MSE on the test set as reported by model.evaluate
, then we’ll also do a direct comparison on the actual predictions.
mse_test = model.evaluate(X_test, y_test_reshaped, verbose=0) print(f"Test MSE: {mse_test[0]:.5f}") # Get predictions predictions = model.predict(X_test) print("Predictions shape:", predictions.shape) # Reshape predictions back to (samples, horizon, num_features) predictions = predictions.reshape(predictions.shape[0], horizon, num_features) # If you'd like to invert the scaling for interpretability: # 1. Flatten predictions to 2D predictions_2d = predictions.reshape(-1, num_features) # 2. Invert scaling predictions_inverted = scaler.inverse_transform(predictions_2d) # 3. Reshape back to 3D predictions_inverted = predictions_inverted.reshape(predictions.shape[0], predictions.shape[1], predictions.shape[2]) # Do the same for y_test y_test_2d = y_test.reshape(-1, num_features) y_test_inverted = scaler.inverse_transform(y_test_2d) y_test_inverted = y_test_inverted.reshape(y_test.shape[0], y_test.shape[1], y_test.shape[2]) print("Predictions (first sample, all 10 timesteps):") print(predictions_inverted[0]) print("Actual (first sample, all 10 timesteps):") print(y_test_inverted[0])
window_size
(history), horizon
(forecast steps), units
(LSTM cells), learning_rate
, batch_size
, and epochs
for better performance.return_sequences=True
on intermediate layers).Dropout
or add L2
regularization to avoid overfitting if your training set is small.import numpy as np import pandas as pd from sklearn.preprocessing import MinMaxScaler from tensorflow.keras.models import Sequential from tensorflow.keras.layers import LSTM, Dense, Dropout from tensorflow.keras.optimizers import Adam # 1. Load data data = pd.read_csv('realDataForTrain.csv', skiprows=1, names=['col1', 'col2']) # 2. Scale data scaler = MinMaxScaler(feature_range=(0, 1)) scaled_data = scaler.fit_transform(data[['col1', 'col2']]) # 3. Create sequences def create_sequences(dataset, window_size=20, horizon=10): X, y = [], [] for i in range(len(dataset) - window_size - horizon + 1): seq_x = dataset[i : i + window_size] seq_y = dataset[i + window_size : i + window_size + horizon] X.append(seq_x) y.append(seq_y) return np.array(X), np.array(y) window_size = 20 horizon = 10 X, y = create_sequences(scaled_data, window_size, horizon) num_features = X.shape[2] # should be 2 # 4. Train-Test Split train_size = int(len(X) * 0.8) X_train, X_test = X[:train_size], X[train_size:] y_train, y_test = y[:train_size], y[train_size:] # 5. Build Model model = Sequential() model.add(LSTM(64, return_sequences=False, input_shape=(window_size, num_features))) model.add(Dropout(0.2)) model.add(Dense(horizon * num_features)) # 10 steps * 2 features = 20 model.compile(loss='mse', optimizer=Adam(learning_rate=0.001), metrics=['mse']) model.summary() # 6. Train Model y_train_reshaped = y_train.reshape(y_train.shape[0], horizon * num_features) y_test_reshaped = y_test.reshape(y_test.shape[0], horizon * num_features) history = model.fit( X_train, y_train_reshaped, validation_data=(X_test, y_test_reshaped), epochs=50, batch_size=32, verbose=1 ) # 7. Evaluate mse_test = model.evaluate(X_test, y_test_reshaped, verbose=0) print(f"Test MSE: {mse_test[0]:.5f}") # 8. Predictions and Inverse Transform pred = model.predict(X_test) pred = pred.reshape(pred.shape[0], horizon, num_features) # Invert scale pred_2d = pred.reshape(-1, num_features) pred_inverted = scaler.inverse_transform(pred_2d) pred_inverted = pred_inverted.reshape(pred.shape[0], pred.shape[1], pred.shape[2]) y_test_inverted = scaler.inverse_transform(y_test.reshape(-1, num_features)) y_test_inverted = y_test_inverted.reshape(y_test.shape[0], y_test.shape[1], y_test.shape[2]) print("Predictions (sample 0):") print(pred_inverted[0]) print("Actual (sample 0):") print(y_test_inverted[0])
That’s it! This end-to-end example shows how to:
You can adjust hyperparameters (e.g., window_size
, horizon
, number of LSTM units, learning rate, etc.) to improve performance for your specific dataset.