使用Python实现LSTM模型,拟合流行病发病人数
参数 | 含义 |
---|---|
input_size | 输入特征的维度 |
hidden_size | 隐藏层的神经元数量 |
output_size | 输出的维度 |
num_heads | 多头注意力机制中头的数量 |
num_layers | Transformer层的数量 |
batch_size | 每批训练的数据样本数量 |
learning_rate | 优化器的学习率 |
epochs | 每次引导法训练的轮数 |
n_bootstrap | 引导法的迭代次数 |
Transformer模型的核心在于自注意力机制和多头注意力(Multi-Head Attention)。以下是主要的数学方程:
$$ \text{Attention}(Q, K, V) = \text{softmax}\left(\frac{QK^T}{\sqrt{d_k}}\right)V $$
其中:
$$ \text{MultiHead}(Q, K, V) = \text{Concat}(\text{head}_1, \dots, \text{head}_h)W^O $$
其中,每个头的计算如下:
$$ \text{head}_i = \text{Attention}(QW_i^Q, KW_i^K, VW_i^V) $$
$$ \text{FFN}(x) = \text{max}(0, xW_1 + b_1)W_2 + b_2 $$
在每个子层(如多头注意力和前馈网络)之后,Transformer使用残差连接和层归一化:
$$ \text{LayerNorm}(x + \text{Sublayer}(x)) $$
Transformer模型适用于以下疾病流行条件的预测:
全局依赖性:适合捕捉长期及全局的依赖关系,尤其是当疾病传播的规律涉及多个因素的复杂相互作用时。 大规模数据集:对于大规模的时间序列数据,Transformer能够高效地进行训练,并处理数据中的长程依赖。 并行计算需求高:当计算资源允许时,Transformer的并行性能够显著提高训练效率,适合大数据量的疾病预测任务。
在开始建模之前,需要配置环境,导入必要的库。
import torch
import torch.nn as nn
import pandas as pd
import numpy as np
import matplotlib.pyplot as plt
from sklearn.preprocessing import StandardScaler
from sklearn.metrics import mean_squared_error
from sklearn.model_selection import ParameterGrid
import matplotlib
import seaborn as sns
from sklearn.metrics import mean_squared_error, mean_absolute_error
from scipy.stats import shapiro
from statsmodels.graphics.tsaplots import plot_acf
from scipy.stats import probplot
from statsmodels.tsa.stattools import adfuller
# 设置中文字体
matplotlib.rcParams['font.sans-serif'] = ['Microsoft YaHei'] # Windows下的常见中文字体
matplotlib.rcParams['axes.unicode_minus'] = False # 正常显示负号
# 数据预处理
data = pd.read_excel('./daily_flu_counts.xlsx')
data = data[['日期', '发病数']].sort_values('日期').fillna(0)
# 使用线性插值填充缺失值
data['发病数'] = data['发病数'].interpolate(method='linear')
# 确保数据非负
data['发病数'] = data['发病数'].clip(lower=0)
# 确保数据非负
data['发病数'] = data['发病数'].clip(lower=0)
# 确保 '日期' 列为 datetime 类型
data['日期'] = pd.to_datetime(data['日期'])
# 绘制原数据的曲线图
plt.figure(figsize=(12, 6))
plt.plot(data['日期'], data['发病数'], label='发病数', color='blue')
plt.title('每日发病数曲线图')
plt.xlabel('日期')
plt.ylabel('发病数')
plt.legend()
plt.grid(True)
plt.xticks(rotation=45)
plt.show()
# 检查是否有可用的 GPU
device = torch.device('cuda:0' if torch.cuda.is_available() else 'cpu')
print(f'Using device: {device}')
# 读取数据
data = pd.read_excel('./daily_flu_counts.xlsx')
data = data[['日期', '发病数']].sort_values('日期').fillna(0)
# 标准化数据
scaler = StandardScaler()
scaled_data = scaler.fit_transform(data['发病数'].values.reshape(-1, 1))
# 创建时间序列数据集
def create_dataset(dataset, look_back=1):
X, Y = [], []
for i in range(len(dataset) - look_back):
X.append(dataset[i:(i + look_back), 0])
Y.append(dataset[i + look_back, 0])
return np.array(X), np.array(Y)
look_back = 30 # 滑动窗口长度
X, Y = create_dataset(scaled_data, look_back)
# 拆分训练集和测试集
train_size = int(len(X) * 0.8)
X_train, X_test = X[:train_size], X[train_size:]
Y_train, Y_test = Y[:train_size], Y[train_size:]
# 转换为 PyTorch 张量
X_train = torch.tensor(X_train, dtype=torch.float32).view(-1, look_back, 1).to(device)
X_test = torch.tensor(X_test, dtype=torch.float32).view(-1, look_back, 1).to(device)
Y_train = torch.tensor(Y_train, dtype=torch.float32).view(-1, 1).to(device)
Y_test = torch.tensor(Y_test, dtype=torch.float32).view(-1, 1).to(device)
# 定义位置编码
class PositionalEncoding(nn.Module):
def __init__(self, d_model, max_len=5000):
super(PositionalEncoding, self).__init__()
pe = torch.zeros(max_len, d_model)
position = torch.arange(0, max_len, dtype=torch.float).unsqueeze(1)
div_term = torch.exp(torch.arange(0, d_model, 2).float() * (-np.log(10000.0) / d_model))
pe[:, 0::2] = torch.sin(position * div_term)
pe[:, 1::2] = torch.cos(position * div_term)
pe = pe.unsqueeze(0) # [1, max_len, d_model]
self.register_buffer('pe', pe)
def forward(self, x):
"""
x: Tensor, shape [batch_size, seq_len, d_model]
"""
x = x + self.pe[:, :x.size(1), :]
return x
# 定义 Transformer 模型
class EnhancedTransformerModel(nn.Module):
def __init__(self, input_size=1, d_model=64, nhead=8, num_encoder_layers=3, dim_feedforward=256, output_size=1, dropout=0.1):
super(EnhancedTransformerModel, self).__init__()
self.input_linear = nn.Linear(input_size, d_model)
self.positional_encoding = PositionalEncoding(d_model)
encoder_layer = nn.TransformerEncoderLayer(d_model=d_model, nhead=nhead,
dim_feedforward=dim_feedforward, dropout=dropout)
self.transformer_encoder = nn.TransformerEncoder(encoder_layer, num_layers=num_encoder_layers)
self.fc1 = nn.Linear(d_model, 64)
self.relu = nn.ReLU()
self.fc2 = nn.Linear(64, output_size)
def forward(self, x):
"""
x: Tensor, shape [batch_size, seq_len, input_size]
"""
x = self.input_linear(x) # [batch_size, seq_len, d_model]
x = self.positional_encoding(x) # 添加位置编码
x = x.transpose(0, 1) # [seq_len, batch_size, d_model]
x = self.transformer_encoder(x) # [seq_len, batch_size, d_model]
x = x.mean(dim=0) # [batch_size, d_model]
x = self.fc1(x)
x = self.relu(x)
x = self.fc2(x)
return x
# 超参数搜索的网格(移除 batch_size)
param_grid = {
'd_model': [64, 128],
'nhead': [4, 8],
'num_encoder_layers': [2, 3],
'dim_feedforward': [256, 512],
'learning_rate': [0.0001, 0.0005, 0.001]
}
# 评估函数(移除 batch_size)
def evaluate_model(d_model, nhead, num_encoder_layers, dim_feedforward, learning_rate):
# 初始化模型
model = EnhancedTransformerModel(
input_size=1,
d_model=d_model,
nhead=nhead,
num_encoder_layers=num_encoder_layers,
dim_feedforward=dim_feedforward
).to(device)
loss_function = nn.MSELoss()
optimizer = torch.optim.Adam(model.parameters(), lr=learning_rate)
# 训练模型
epochs = 100
model.train()
for epoch in range(epochs):
optimizer.zero_grad()
y_pred = model(X_train)
loss = loss_function(y_pred, Y_train)
loss.backward()
optimizer.step()
# 预测和计算损失
model.eval()
with torch.no_grad():
test_predictions = model(X_test).cpu().numpy()
test_predictions_scaled = scaler.inverse_transform(test_predictions)
Y_test_scaled = scaler.inverse_transform(Y_test.cpu().numpy())
mse = mean_squared_error(Y_test_scaled, test_predictions_scaled)
return mse
# 网格搜索
best_params = None
best_mse = float('inf')
for params in ParameterGrid(param_grid):
print(f'Evaluating with parameters: {params}')
mse = evaluate_model(**params)
print(f'MSE: {mse}')
if mse < best_mse:
best_mse = mse
best_params = params
print(f'Best parameters: {best_params}')
print(f'Best MSE: {best_mse}')
# 使用最佳参数实例化模型
model = EnhancedTransformerModel(
input_size=1,
d_model=best_params['d_model'],
nhead=best_params['nhead'],
num_encoder_layers=best_params['num_encoder_layers'],
dim_feedforward=best_params['dim_feedforward']
).to(device)
# 定义损失函数和优化器
loss_function = nn.MSELoss()
optimizer = torch.optim.Adam(model.parameters(), lr=best_params['learning_rate'])
# 训练模型
epochs = 500
train_losses = []
for epoch in range(epochs):
model.train()
optimizer.zero_grad()
y_pred = model(X_train)
loss = loss_function(y_pred, Y_train)
loss.backward()
optimizer.step()
train_losses.append(loss.item())
if epoch % 50 == 0:
print(f'Epoch {epoch}, Loss: {loss.item()}')
# 绘制训练损失
plt.plot(train_losses, label="训练损失")
plt.xlabel("Epochs")
plt.ylabel("Loss")
plt.legend()
plt.show()
# 测试集预测
model.eval()
with torch.no_grad():
test_predictions = model(X_test).cpu().numpy()
# 逆归一化
test_predictions_scaled = scaler.inverse_transform(test_predictions)
Y_test_scaled = scaler.inverse_transform(Y_test.cpu().numpy())
# 确保预测值不小于0
test_predictions_scaled = np.clip(test_predictions_scaled, 0, None)
# 计算残差并中心化
residuals = Y_test_scaled.flatten() - test_predictions_scaled.flatten()
residuals_centered = residuals - residuals.mean()
# Bootstrap参数
n_bootstrap = 1000 # 重采样次数
bootstrap_predictions = []
np.random.seed(42) # 为了结果可复现
for _ in range(n_bootstrap):
# 从中心化的残差中有放回地采样
sampled_residuals = np.random.choice(residuals_centered, size=len(residuals_centered), replace=True)
# 生成新的预测样本
bootstrap_pred = test_predictions_scaled.flatten() + sampled_residuals
# 确保预测值不小于0
bootstrap_pred = np.clip(bootstrap_pred, 0, None)
bootstrap_predictions.append(bootstrap_pred)
bootstrap_predictions = np.array(bootstrap_predictions)
# 计算置信区间(例如 95%)
lower_percentile = 2.5
upper_percentile = 97.5
ci_lower = np.percentile(bootstrap_predictions, lower_percentile, axis=0)
ci_upper = np.percentile(bootstrap_predictions, upper_percentile, axis=0)
# 预测未来的步骤数(例如 30 天)
future_steps = 30
future_predictions = []
last_sequence = scaled_data[-look_back:].reshape(1, look_back, 1)
last_sequence = torch.tensor(last_sequence, dtype=torch.float32).to(device)
model.eval()
with torch.no_grad():
for _ in range(future_steps):
pred_scaled = model(last_sequence).cpu().numpy()
future_predictions.append(pred_scaled.flatten()[0])
# 更新序列:移除第一个元素,添加新的预测
new_sequence = np.append(last_sequence.cpu().numpy().flatten()[1:], pred_scaled)
last_sequence = torch.tensor(new_sequence.reshape(1, look_back, 1), dtype=torch.float32).to(device)
# 逆归一化
future_predictions_scaled = scaler.inverse_transform(np.array(future_predictions).reshape(-1, 1)).flatten()
future_predictions_scaled = np.clip(future_predictions_scaled, 0, None)
# Bootstrap 未来预测置信区间
bootstrap_future_predictions = []
for _ in range(n_bootstrap):
# 假设未来预测的残差分布与测试集相似
sampled_residuals = np.random.choice(residuals_centered, size=future_steps, replace=True)
bootstrap_future_pred = future_predictions_scaled + sampled_residuals
# 确保预测值不小于0
bootstrap_future_pred = np.clip(bootstrap_future_pred, 0, None)
bootstrap_future_predictions.append(bootstrap_future_pred)
bootstrap_future_predictions = np.array(bootstrap_future_predictions)
# 计算置信区间(例如 95%)
ci_lower_future = np.percentile(bootstrap_future_predictions, lower_percentile, axis=0)
ci_upper_future = np.percentile(bootstrap_future_predictions, upper_percentile, axis=0)
# 生成未来的日期
last_date = data['日期'].iloc[-1]
# 确保 last_date 是 pandas Timestamp 对象
if not isinstance(last_date, pd.Timestamp):
last_date = pd.to_datetime(last_date)
# 使用 start 和 periods 生成未来的日期,不使用 closed 参数
future_dates = pd.date_range(start=last_date + pd.Timedelta(days=1), periods=future_steps)
# 可视化结果
plt.figure(figsize=(12,6))
plt.plot(data['日期'], data['发病数'], label='历史数据')
plt.plot(data['日期'].iloc[-len(Y_test):], Y_test_scaled, label='实际值', color='green')
plt.plot(data['日期'].iloc[-len(Y_test):], test_predictions_scaled, label='预测值', color='red')
plt.fill_between(data['日期'].iloc[-len(Y_test):], ci_lower, ci_upper, color='orange', alpha=0.3, label='95% 置信区间')
plt.plot(future_dates, future_predictions_scaled, label='未来预测', color='blue')
plt.fill_between(future_dates, ci_lower_future, ci_upper_future, color='lightblue', alpha=0.3, label='95% 置信区间')
plt.xlabel('日期')
plt.ylabel('发病数')
plt.legend()
plt.show()
rmse = np.sqrt(mean_squared_error(Y_test_scaled, test_predictions_scaled))
mae = mean_absolute_error(Y_test_scaled, test_predictions_scaled)
mape = np.mean(np.abs((Y_test_scaled - test_predictions_scaled) / Y_test_scaled)) * 100
smape = np.mean(2 * np.abs(Y_test_scaled - test_predictions_scaled) / (np.abs(Y_test_scaled) + np.abs(test_predictions_scaled))) * 100
# 打印结果
print(f"RMSE: {rmse:.4f}")
print(f"MAE: {mae:.4f}")
print(f"MAPE: {mape:.2f}%")
print(f"SMAPE: {smape:.2f}%")
# 绘制残差的直方图
plt.figure(figsize=(10, 6))
sns.histplot(residuals, kde=True, bins=30, color='blue', stat='density')
plt.title('Residuals Distribution')
plt.xlabel('Residuals')
plt.ylabel('Density')
plt.show()
# 进行 Shapiro-Wilk 正态性检验
stat, p_value = shapiro(residuals)
print(f'Shapiro-Wilk Test: Stat={stat}, p-value={p_value}')
# 绘制残差的自相关函数(ACF)
plt.figure(figsize=(10, 6))
plot_acf(residuals, lags=30) # 选择lags的数量,根据需要调整
plt.title('ACF of Residuals')
plt.show()
# 绘制 Q-Q 图
plt.figure(figsize=(6, 6))
probplot(residuals_centered, dist="norm", plot=plt)
plt.title("Q-Q 图")
plt.show()
Epoch 50, Loss: 0.041903600096702576
Epoch 100, Loss: 0.03575131669640541
Epoch 150, Loss: 0.025699811056256294
Epoch 200, Loss: 0.019787732511758804
Epoch 250, Loss: 0.012308957055211067
Epoch 300, Loss: 0.010088082402944565
Epoch 350, Loss: 0.0069724190980196
Epoch 400, Loss: 0.0056532821618020535
Epoch 450, Loss: 0.0049051567912101746
Best parameters: {’d_model': 64, ‘dim_feedforward’: 512, ‘learning_rate’: 0.0005, ‘nhead’: 4, ‘num_encoder_layers’: 2}
RMSE: 54.3682
MAE: 42.9718
MAPE: 14.77%
SMAPE: 16.44%
模型评价指标
RMSE: 54.3682
MAE: 42.9718
MAPE: 14.77%
SMAPE: 16.44%
RMSE 和 MAE 都较高,这可能表明模型的整体预测能力还有待提高,本例数据本身的值较大且波动较大故仍在接受范围。 MAPE 和 SMAPE 表示中等的预测精度,14.77%和16.44% 的值表明模型的相对准确性可以接受,但仍有提升空间。
Shapiro-Wilk Test: Stat=0.9784532785415649, p-value=0.48873645067214966
绘制残差的直方图,并进行正态性检验(如 Shapiro-Wilk 测试)。如果残差分布近似于正态分布,表示模型可能已经捕捉到数据的所有信息。如果 p-value 很小(通常小于 0.05),说明残差不是正态分布,可能需要改进模型。本例p-value=0.488>0.05。
在理想情况下,模型的残差应该接近白噪声(即没有显著的自相关),所有滞后特征的自相关系数应该都落在置信区间内。本例绝大部分滞后特征的自相关系数都在置信区间内,仅有一个滞后特征超出,这可能是由于随机性或样本误差导致的,而不是模型本身的问题。
说明残差的大部分数据分布与正态分布较为接近,模型的假设总体上是合理的。
未引入例如周、月份、季节性等的时间特征。
当前模型可能存在过拟合风险,尤其是对小样本数据集。可对其增加正则化手段(L2 正则化、增加 Dropout 层)
①高计算复杂度:当序列长度较长(例如,超过几千)时,计算注意力权重矩阵会消耗大量内存,并显著增加训练时间。 ②Transformer 模型在时间序列预测中表现较好,但通常更适用于多维时间序列或具有复杂依赖关系的数据。如果数据相对简单(单变量时间序列),复杂的 Transformer 架构可能导致过拟合。 ③时间序列通常有明确的因果关系(即过去影响未来),而 Transformer 模型本质上是同时处理输入序列的所有时间步,并不直接捕捉因果性。 ④可解释性问题:Transformer 模型的注意力机制可以提供一定的可解释性(如观察哪些输入时间步对预测贡献更大),但相比于传统模型(如 ARIMA 或简单线性回归)的参数显著性,Transformer 的可解释性仍较弱。