通过Python实现LSTM模型,对流行病发病人数进行时间序列预测
参数 | 含义 |
---|---|
input_size | 输入特征的维度 |
hidden_size | 隐藏层的神经元数量 |
output_size | 输出的维度 |
num_layers | LSTM层的数量 |
sequence_length | 输入序列的长度(时间步数) |
batch_size | 每批训练的数据样本数量 |
learning_rate | 优化器的学习率 |
epochs | 每次引导法训练的轮数 |
n_bootstrap | 引导法的迭代次数 |
LSTM的关键更新方程可以简要表示为:
\begin{aligned}
f_t &= \sigma (W_f \cdot [h_{t-1}, x_t] + b_f),\\
i_t &= \sigma (W_i \cdot [h_{t-1}, x_t] + b_i),\\
\tilde{C}_t &= \tanh(W_C \cdot [h_{t-1}, x_t] + b_C),\\
C_t &= f_t \cdot C_{t-1} + i_t \cdot \tilde{C}_t,\\
o_t &= \sigma (W_o \cdot [h_{t-1}, x_t] + b_o),\\
h_t &= o_t \cdot \tanh(C_t)
\end{aligned}
LSTM模型适用于以下疾病流行条件的预测:
长期依赖性:适合用于具有长期依赖关系的时间序列数据,如季节性流感等。 复杂传播模式:对于传播过程复杂且变化多样的疾病,如COVID-19,LSTM能够捕捉其复杂的动态变化。 数据序列较长:适用于历史数据较长、需要从较远的时间步中获取信息的情况。
下面的代码展示了如何使用LSTM对某病发病数进行预测,并使用Bootstrap方法构建预测置信区间。
import torch
import torch.nn as nn
import pandas as pd
import numpy as np
import matplotlib.pyplot as plt
from sklearn.preprocessing import StandardScaler
from collections import deque
from sklearn.model_selection import ParameterGrid
from statsmodels.tsa.stattools import adfuller
from sklearn.metrics import mean_squared_error, mean_absolute_error
import seaborn as sns
from scipy.stats import shapiro
from scipy.stats import probplot
from statsmodels.graphics.tsaplots import plot_acf
import matplotlib.pyplot as plt
import matplotlib
# 设置中文字体
matplotlib.rcParams['font.sans-serif'] = ['Microsoft YaHei'] # Windows下的常见中文字体
matplotlib.rcParams['axes.unicode_minus'] = False # 正常显示负号
# 数据预处理
data = pd.read_excel('./daily_flu_counts.xlsx')
# 使用线性插值填充缺失值
data['发病数'] = data['发病数'].interpolate(method='linear')
# 确保数据非负
data['发病数'] = data['发病数'].clip(lower=0)
# 确保 '日期' 列为 datetime 类型
data['日期'] = pd.to_datetime(data['日期'])
# 绘制原数据的曲线图
plt.figure(figsize=(12, 6))
plt.plot(data['日期'], data['发病数'], label='发病数', color='blue')
plt.title('每日发病数曲线图')
plt.xlabel('日期')
plt.ylabel('发病数')
plt.legend()
plt.grid(True)
plt.xticks(rotation=45)
plt.show()
# 检查是否有可用的 GPU
device = torch.device('cuda:0' if torch.cuda.is_available() else 'cpu')
print(f'Using device: {device}')
# 读取处理
data = pd.read_excel('./daily_flu_counts.xlsx')
data = data[['日期', '发病数']].sort_values('日期').fillna(0)
# 标准化数据
scaler = StandardScaler()
scaled_data = scaler.fit_transform(data['发病数'].values.reshape(-1, 1))
# 创建时间序列数据集
def create_dataset(dataset, look_back=1):
X, Y = [], []
for i in range(len(dataset) - look_back):
X.append(dataset[i:(i + look_back), 0])
Y.append(dataset[i + look_back, 0])
return np.array(X), np.array(Y)
look_back = 14 # 滑动窗口长度
X, Y = create_dataset(scaled_data, look_back)
# 拆分训练集和测试集
train_size = int(len(X) * 0.8)
X_train, X_test = X[:train_size], X[train_size:]
Y_train, Y_test = Y[:train_size], Y[train_size:]
# 转换为 PyTorch 张量
X_train = torch.tensor(X_train, dtype=torch.float32).view(-1, look_back, 1).to(device)
X_test = torch.tensor(X_test, dtype=torch.float32).view(-1, look_back, 1).to(device)
Y_train = torch.tensor(Y_train, dtype=torch.float32).view(-1, 1).to(device)
Y_test = torch.tensor(Y_test, dtype=torch.float32).view(-1, 1).to(device)
# 改进后的 LSTM 模型
class EnhancedLSTMModel(nn.Module):
def __init__(self, input_size=1, lstm_hidden_size=64, lstm_layers=1, output_size=1):
super(EnhancedLSTMModel, self).__init__()
self.lstm = nn.LSTM(input_size, lstm_hidden_size, lstm_layers, batch_first=True, dropout=0.2)
self.fc1 = nn.Linear(lstm_hidden_size, 64)
self.relu = nn.ReLU()
self.fc2 = nn.Linear(64, output_size)
def forward(self, x):
_, (hn, _) = self.lstm(x)
x = self.fc1(hn[-1])
x = self.relu(x)
x = self.fc2(x)
return x
# 超参数搜索的网格
param_grid = {
'lstm_hidden_size': [64, 128, 256],
'lstm_layers': [1, 2, 3],
'learning_rate': [0.0001, 0.0005, 0.001],
'batch_size': [16, 32, 64]
}
# 评估函数
def evaluate_model(lstm_hidden_size, lstm_layers, learning_rate, batch_size):
# 初始化模型
model = EnhancedLSTMModel(input_size=1, lstm_hidden_size=lstm_hidden_size, lstm_layers=lstm_layers).to(device)
loss_function = nn.MSELoss()
optimizer = torch.optim.Adam(model.parameters(), lr=learning_rate)
# 训练模型
epochs = 100
for epoch in range(epochs):
model.train()
optimizer.zero_grad()
y_pred = model(X_train)
loss = loss_function(y_pred, Y_train)
loss.backward()
optimizer.step()
# 预测和计算损失
model.eval()
with torch.no_grad():
test_predictions = model(X_test).cpu().numpy()
test_predictions_scaled = scaler.inverse_transform(test_predictions)
Y_test_scaled = scaler.inverse_transform(Y_test.cpu().numpy())
mse = mean_squared_error(Y_test_scaled, test_predictions_scaled)
return mse
# 网格搜索
best_params = None
best_mse = float('inf')
for params in ParameterGrid(param_grid):
print(f'Evaluating with parameters: {params}')
mse = evaluate_model(**params)
print(f'MSE: {mse}')
if mse < best_mse:
best_mse = mse
best_params = params
print(f'Best parameters: {best_params}')
print(f'Best MSE: {best_mse}')
# 使用最佳超参数初始化模型
best_lstm_hidden_size = best_params['lstm_hidden_size']
best_lstm_layers = best_params['lstm_layers']
best_learning_rate = best_params['learning_rate']
best_batch_size = best_params['batch_size']
# 初始化模型
model = EnhancedLSTMModel(
input_size=1,
lstm_hidden_size=best_lstm_hidden_size,
lstm_layers=best_lstm_layers
).to(device)
# 定义损失函数和优化器
loss_function = nn.MSELoss()
optimizer = torch.optim.Adam(model.parameters(), lr=best_learning_rate)
# 训练模型
epochs = 300
train_losses = []
for epoch in range(epochs):
model.train()
optimizer.zero_grad()
y_pred = model(X_train)
loss = loss_function(y_pred, Y_train)
loss.backward()
optimizer.step()
train_losses.append(loss.item())
if epoch % 50 == 0:
print(f'Epoch {epoch}, Loss: {loss.item()}')
# 绘制训练损失
plt.plot(train_losses, label="训练损失")
plt.xlabel("Epochs")
plt.ylabel("Loss")
plt.legend()
plt.show()
# 测试集预测
model.eval()
with torch.no_grad():
test_predictions = model(X_test).cpu().numpy()
# 逆归一化
test_predictions_scaled = scaler.inverse_transform(test_predictions)
Y_test_scaled = scaler.inverse_transform(Y_test.cpu().numpy())
# 确保预测值不小于0
test_predictions_scaled = np.clip(test_predictions_scaled, 0, None)
# 计算残差并中心化
residuals = Y_test_scaled.flatten() - test_predictions_scaled.flatten()
residuals_centered = residuals - residuals.mean()
# Bootstrap 参数
n_bootstrap = 1000 # 重采样次数
bootstrap_predictions = []
np.random.seed(42) # 为了结果可复现
for _ in range(n_bootstrap):
# 从中心化的残差中有放回地采样
sampled_residuals = np.random.choice(residuals_centered, size=len(residuals_centered), replace=True)
# 生成新的预测样本
bootstrap_pred = test_predictions_scaled.flatten() + sampled_residuals
# 确保预测值不小于0
bootstrap_pred = np.clip(bootstrap_pred, 0, None)
bootstrap_predictions.append(bootstrap_pred)
bootstrap_predictions = np.array(bootstrap_predictions)
# 计算置信区间(例如 95%)
lower_percentile = 2.5
upper_percentile = 97.5
ci_lower = np.percentile(bootstrap_predictions, lower_percentile, axis=0)
ci_upper = np.percentile(bootstrap_predictions, upper_percentile, axis=0)
# 预测未来的步骤数(例如 30 天)
future_steps = 30
future_predictions = []
last_sequence = scaled_data[-look_back:].reshape(1, look_back, 1)
last_sequence = torch.tensor(last_sequence, dtype=torch.float32).to(device)
model.eval()
with torch.no_grad():
for _ in range(future_steps):
pred_scaled = model(last_sequence).cpu().numpy()
future_predictions.append(pred_scaled.flatten()[0])
# 更新序列:移除第一个元素,添加新的预测
new_sequence = np.append(last_sequence.cpu().numpy().flatten()[1:], pred_scaled)
last_sequence = torch.tensor(new_sequence.reshape(1, look_back, 1), dtype=torch.float32).to(device)
# 逆归一化
future_predictions_scaled = scaler.inverse_transform(np.array(future_predictions).reshape(-1, 1)).flatten()
future_predictions_scaled = np.clip(future_predictions_scaled, 0, None)
# Bootstrap未来预测置信区间
bootstrap_future_predictions = []
for _ in range(n_bootstrap):
# 假设未来预测的残差分布与测试集相似
sampled_residuals = np.random.choice(residuals_centered, size=future_steps, replace=True)
bootstrap_future_pred = future_predictions_scaled + sampled_residuals
# 确保预测值不小于0
bootstrap_future_pred = np.clip(bootstrap_future_pred, 0, None)
bootstrap_future_predictions.append(bootstrap_future_pred)
bootstrap_future_predictions = np.array(bootstrap_future_predictions)
# 计算置信区间(例如 95%)
ci_lower_future = np.percentile(bootstrap_future_predictions, lower_percentile, axis=0)
ci_upper_future = np.percentile(bootstrap_future_predictions, upper_percentile, axis=0)
# 生成未来的日期
last_date = data['日期'].iloc[-1]
# 确保 last_date 是 pandas Timestamp 对象
if not isinstance(last_date, pd.Timestamp):
last_date = pd.to_datetime(last_date)
# 使用 start 和 periods 生成未来的日期,不使用 closed 参数
future_dates = pd.date_range(start=last_date + pd.Timedelta(days=1), periods=future_steps)
# 可视化训练集曲线、测试集预测与置信区间、以及未来预测与置信区间
plt.figure(figsize=(14, 7))
# 绘制训练集曲线
plt.plot(data['日期'][:train_size + look_back], scaler.inverse_transform(scaled_data[:train_size + look_back]), label='训练集', color='gray', linestyle='--')
# 绘制测试集预测值和实际值
plt.plot(data['日期'].iloc[train_size + look_back:], Y_test_scaled, label='实际值', color='blue')
plt.plot(data['日期'].iloc[train_size + look_back:], test_predictions_scaled, label='预测值', color='red')
plt.fill_between(data['日期'].iloc[train_size + look_back:], ci_lower, ci_upper, color='pink', alpha=0.5, label='95% 置信区间')
# 绘制未来预测值和置信区间
plt.plot(future_dates, future_predictions_scaled, label='未来预测值', color='green')
plt.fill_between(future_dates, ci_lower_future, ci_upper_future, color='lightgreen', alpha=0.5, label='95% 置信区间')
# 添加图例、标题和标签
plt.xlabel('日期')
plt.ylabel('发病数')
plt.title('某病LSTM预测并用bootstrap增加置信区间')
plt.legend()
# 显示图像
plt.show()
# 计算RMSE
rmse = np.sqrt(mean_squared_error(Y_test_scaled, test_predictions_scaled))
print(f'RMSE: {rmse}')
# 计算MAE
mae = mean_absolute_error(Y_test_scaled, test_predictions_scaled)
print(f'MAE: {mae}')
# 计算MAPE
mape = np.mean(np.abs((Y_test_scaled - test_predictions_scaled) / Y_test_scaled)) * 100
print(f'MAPE: {mape}%')
# 计算SMAPE
smape = np.mean(2 * np.abs(Y_test_scaled - test_predictions_scaled) / (np.abs(Y_test_scaled) + np.abs(test_predictions_scaled))) * 100
print(f'SMAPE: {smape}%')
# 绘制残差的直方图
plt.figure(figsize=(10, 6))
sns.histplot(residuals, kde=True, bins=30, color='blue', stat='density')
plt.title('Residuals Distribution')
plt.xlabel('Residuals')
plt.ylabel('Density')
plt.show()
# 进行 Shapiro-Wilk 正态性检验
stat, p_value = shapiro(residuals)
print(f'Shapiro-Wilk Test: Stat={stat}, p-value={p_value}')
# 绘制残差的自相关函数(ACF)
plt.figure(figsize=(10, 6))
plot_acf(residuals, lags=30) # 选择lags的数量,根据需要调整
plt.title('ACF of Residuals')
plt.show()
# 残差与预测值的散点图
plt.figure(figsize=(10, 6))
plt.scatter(test_predictions_scaled.flatten(), residuals, color='blue', alpha=0.5)
plt.title('Residuals vs Predicted Values')
plt.xlabel('Predicted Values')
plt.ylabel('Residuals')
plt.show()
# 绘制 Q-Q 图
plt.figure(figsize=(6, 6))
probplot(residuals_centered, dist="norm", plot=plt)
plt.title("Q-Q 图")
plt.show()
Best parameters: {'batch_size': 32, 'learning_rate': 0.001, 'lstm_hidden_size': 64, 'lstm_layers': 1}
Epoch 0, Loss: 1.1413506269454956
Epoch 50, Loss: 0.16259007155895233
Epoch 100, Loss: 0.06618689000606537
Epoch 150, Loss: 0.052911654114723206
Epoch 200, Loss: 0.048041947185993195
Epoch 250, Loss: 0.04660336673259735
模型评价指标 RMSE: 51.99990463256836 MAE: 42.11808395385742 MAPE: 16.316121816635132% SMAPE: 14.625115692615509%
RMSE 和 MAE 都较高,这可能表明模型的整体预测能力还有待提高,本例数据本身的值较大且波动较大故仍在接受范围。 MAPE 和 SMAPE 表示中等的预测精度,16.31%和14.62%的值表明模型的相对准确性可以接受,但仍有提升空间。
Shapiro-Wilk Test: Stat=0.9668310880661011, p-value=0.13944154977798462
绘制残差的直方图,并进行正态性检验(如 Shapiro-Wilk 测试)。如果残差分布近似于正态分布,表示模型可能已经捕捉到数据的所有信息。如果 p-value 很小(通常小于 0.05),说明残差不是正态分布,可能需要改进模型。本例p-value=0.139>0.05。
残差的自相关性可以通过计算自相关函数(ACF)来检验。如果残差没有自相关性,那么可以认为模型的误差是随机的,不含有可预测的信息。
如果残差与预测值之间存在显著的模式,可能意味着模型未能完全拟合数据,存在一定的系统误差。如果散点图中残差与预测值之间存在系统性的趋势(例如,呈现某种形状),那么模型可能存在欠拟合的问题。本例散点图无明显形态。
Q-Q图进一步验证了残差的正态性。大部分点紧密分布在参考线附近,表明残差与正态分布吻合良好。这说明模型的预测误差符合正态分布的假设,有助于后续的统计分析和置信区间的构建。
①拟合精度:通过RMSE、MAE、MAPE以及SMAPE等指标,可以看出模型具有一定的预测能力,但在一些极端值或振荡幅度较大的阶段,仍可能存在较大误差。
②残差分析:残差分布较为接近正态分布,且自相关程度较低,说明模型对大部分时段的拟合较好。
③Bootstrap置信区间:对预测结果的Bootstrap方法可量化未来预测的不确定性,为决策者提供风险区间。
④LSTM虽然可以处理非平稳数据,但对非平稳数据进行预处理可以得到更好的预测结果。
①更复杂的特征工程:可考虑引入气候、人口流动、卫生干预措施等外部变量。
②正则化与早停:在多层LSTM中使用Dropout或权重衰减,结合Early Stopping降低过拟合风险。
③混合模型:将LSTM与ARIMA或XGBoost等传统统计/机器学习模型相结合,以获得更高的精度。
①LSTM模型能够捕捉时间序列中的非线性关系,但是它的能力有限,特别是在面对复杂的非线性数据时。对于某些非线性行为,模型可能无法完全捕捉到其内在的规律,导致误差。
②模型泛化能力:
过拟合:在训练过程中,如果LSTM模型过于复杂(例如过多的隐藏层或过大的隐藏层单元数),它可能会在训练集上表现得很好,但在测试集上表现较差。过拟合意味着模型学到了训练数据中的噪声,而不是数据的实际模式。
训练集和测试集的分布差异:LSTM模型假设训练集和测试集的分布相似,但在实际应用中,尤其是面对季节性疾病(如流感)的预测时,训练集和测试集的分布可能会发生变化(例如季节变化、突发事件等),从而影响模型的泛化能力。
③LSTM模型通常假设数据序列中的时间步之间存在某种程度的依赖关系,但对于某些类型的时间序列,特别是那些含有强烈外部因素(例如节假日、政策变化等)的序列,LSTM可能无法完全捕捉这些因素的影响。