使用LSTM预测某病发病数并用Bootstrap增加置信区间

通过Python实现LSTM模型,对流行病发病人数进行时间序列预测


日期

LSTM模型

1 概述

1.1 模型原理

1.2 模型假设与应用

1.3 模型参数及含义

参数 含义
input_size 输入特征的维度
hidden_size 隐藏层的神经元数量
output_size 输出的维度
num_layers LSTM层的数量
sequence_length 输入序列的长度(时间步数)
batch_size 每批训练的数据样本数量
learning_rate 优化器的学习率
epochs 每次引导法训练的轮数
n_bootstrap 引导法的迭代次数

2 模型流程

2.1 LSTM模型结构

  1. 输入层:将滑动窗口(look_back)后的序列输入到LSTM层。
  2. LSTM层:多个隐藏单元以及若干层堆叠(lstm_layers可选),捕捉时间依赖性。
  3. 全连接层:将LSTM的隐藏层输出映射到预测值输出。
  4. 激活函数:常见使用ReLU或其他激活函数,帮助网络学习非线性关系。
  5. 输出层:输出下一步的发病数预测值。

2.2LSTM模型流程图

png png

2.3 模型方程

LSTM的关键更新方程可以简要表示为:
\begin{aligned} f_t &= \sigma (W_f \cdot [h_{t-1}, x_t] + b_f),\\
i_t &= \sigma (W_i \cdot [h_{t-1}, x_t] + b_i),\\
\tilde{C}_t &= \tanh(W_C \cdot [h_{t-1}, x_t] + b_C),\\
C_t &= f_t \cdot C_{t-1} + i_t \cdot \tilde{C}_t,\\
o_t &= \sigma (W_o \cdot [h_{t-1}, x_t] + b_o),\\
h_t &= o_t \cdot \tanh(C_t) \end{aligned}

3 适用疾病流行条件

LSTM模型适用于以下疾病流行条件的预测:

长期依赖性:适合用于具有长期依赖关系的时间序列数据,如季节性流感等。 复杂传播模式:对于传播过程复杂且变化多样的疾病,如COVID-19,LSTM能够捕捉其复杂的动态变化。 数据序列较长:适用于历史数据较长、需要从较远的时间步中获取信息的情况。

4 实验设计及代码实现

下面的代码展示了如何使用LSTM对某病发病数进行预测,并使用Bootstrap方法构建预测置信区间。

4.1 包的导入

import torch
import torch.nn as nn
import pandas as pd
import numpy as np
import matplotlib.pyplot as plt
from sklearn.preprocessing import StandardScaler
from collections import deque
from sklearn.model_selection import ParameterGrid
from statsmodels.tsa.stattools import adfuller
from sklearn.metrics import mean_squared_error, mean_absolute_error
import seaborn as sns
from scipy.stats import shapiro
from scipy.stats import probplot
from statsmodels.graphics.tsaplots import plot_acf
import matplotlib.pyplot as plt
import matplotlib
# 设置中文字体
matplotlib.rcParams['font.sans-serif'] = ['Microsoft YaHei']  # Windows下的常见中文字体
matplotlib.rcParams['axes.unicode_minus'] = False  # 正常显示负号

4.2 数据预处理

# 数据预处理
data = pd.read_excel('./daily_flu_counts.xlsx')
# 使用线性插值填充缺失值
data['发病数'] = data['发病数'].interpolate(method='linear')
# 确保数据非负
data['发病数'] = data['发病数'].clip(lower=0)

# 确保 '日期' 列为 datetime 类型
data['日期'] = pd.to_datetime(data['日期'])


# 绘制原数据的曲线图
plt.figure(figsize=(12, 6))
plt.plot(data['日期'], data['发病数'], label='发病数', color='blue')


plt.title('每日发病数曲线图')
plt.xlabel('日期')
plt.ylabel('发病数')
plt.legend()
plt.grid(True)
plt.xticks(rotation=45)
plt.show()

4.3模型的训练


# 检查是否有可用的 GPU
device = torch.device('cuda:0' if torch.cuda.is_available() else 'cpu')
print(f'Using device: {device}')

# 读取处理
data = pd.read_excel('./daily_flu_counts.xlsx')
data = data[['日期', '发病数']].sort_values('日期').fillna(0)

# 标准化数据
scaler = StandardScaler()
scaled_data = scaler.fit_transform(data['发病数'].values.reshape(-1, 1))

# 创建时间序列数据集
def create_dataset(dataset, look_back=1):
    X, Y = [], []
    for i in range(len(dataset) - look_back):
        X.append(dataset[i:(i + look_back), 0])
        Y.append(dataset[i + look_back, 0])
    return np.array(X), np.array(Y)

look_back = 14  # 滑动窗口长度
X, Y = create_dataset(scaled_data, look_back)

# 拆分训练集和测试集
train_size = int(len(X) * 0.8)
X_train, X_test = X[:train_size], X[train_size:]
Y_train, Y_test = Y[:train_size], Y[train_size:]

# 转换为 PyTorch 张量
X_train = torch.tensor(X_train, dtype=torch.float32).view(-1, look_back, 1).to(device)
X_test = torch.tensor(X_test, dtype=torch.float32).view(-1, look_back, 1).to(device)
Y_train = torch.tensor(Y_train, dtype=torch.float32).view(-1, 1).to(device)
Y_test = torch.tensor(Y_test, dtype=torch.float32).view(-1, 1).to(device)

# 改进后的 LSTM 模型
class EnhancedLSTMModel(nn.Module):
    def __init__(self, input_size=1, lstm_hidden_size=64, lstm_layers=1, output_size=1):
        super(EnhancedLSTMModel, self).__init__()
        self.lstm = nn.LSTM(input_size, lstm_hidden_size, lstm_layers, batch_first=True, dropout=0.2)
        self.fc1 = nn.Linear(lstm_hidden_size, 64)
        self.relu = nn.ReLU()
        self.fc2 = nn.Linear(64, output_size)

    def forward(self, x):
        _, (hn, _) = self.lstm(x)
        x = self.fc1(hn[-1])
        x = self.relu(x)
        x = self.fc2(x)
        return x

# 超参数搜索的网格
param_grid = {
    'lstm_hidden_size': [64, 128, 256],
    'lstm_layers': [1, 2, 3],
    'learning_rate': [0.0001, 0.0005, 0.001],
    'batch_size': [16, 32, 64]
}

# 评估函数
def evaluate_model(lstm_hidden_size, lstm_layers, learning_rate, batch_size):
    # 初始化模型
    model = EnhancedLSTMModel(input_size=1, lstm_hidden_size=lstm_hidden_size, lstm_layers=lstm_layers).to(device)
    loss_function = nn.MSELoss()
    optimizer = torch.optim.Adam(model.parameters(), lr=learning_rate)

    # 训练模型
    epochs = 100
    for epoch in range(epochs):
        model.train()
        optimizer.zero_grad()
        y_pred = model(X_train)
        loss = loss_function(y_pred, Y_train)
        loss.backward()
        optimizer.step()

    # 预测和计算损失
    model.eval()
    with torch.no_grad():
        test_predictions = model(X_test).cpu().numpy()
    test_predictions_scaled = scaler.inverse_transform(test_predictions)
    Y_test_scaled = scaler.inverse_transform(Y_test.cpu().numpy())
    mse = mean_squared_error(Y_test_scaled, test_predictions_scaled)
    
    return mse

# 网格搜索
best_params = None
best_mse = float('inf')

for params in ParameterGrid(param_grid):
    print(f'Evaluating with parameters: {params}')
    mse = evaluate_model(**params)
    print(f'MSE: {mse}')
    
    if mse < best_mse:
        best_mse = mse
        best_params = params

print(f'Best parameters: {best_params}')
print(f'Best MSE: {best_mse}')

# 使用最佳超参数初始化模型
best_lstm_hidden_size = best_params['lstm_hidden_size']
best_lstm_layers = best_params['lstm_layers']
best_learning_rate = best_params['learning_rate']
best_batch_size = best_params['batch_size']

# 初始化模型
model = EnhancedLSTMModel(
    input_size=1,
    lstm_hidden_size=best_lstm_hidden_size,
    lstm_layers=best_lstm_layers
).to(device)

# 定义损失函数和优化器
loss_function = nn.MSELoss()
optimizer = torch.optim.Adam(model.parameters(), lr=best_learning_rate)

# 训练模型
epochs = 300
train_losses = []
for epoch in range(epochs):
    model.train()
    optimizer.zero_grad()
    y_pred = model(X_train)
    loss = loss_function(y_pred, Y_train)
    loss.backward()
    optimizer.step()

    train_losses.append(loss.item())
    if epoch % 50 == 0:
        print(f'Epoch {epoch}, Loss: {loss.item()}')

# 绘制训练损失
plt.plot(train_losses, label="训练损失")
plt.xlabel("Epochs")
plt.ylabel("Loss")
plt.legend()
plt.show()

# 测试集预测
model.eval()
with torch.no_grad():
    test_predictions = model(X_test).cpu().numpy()

# 逆归一化
test_predictions_scaled = scaler.inverse_transform(test_predictions)
Y_test_scaled = scaler.inverse_transform(Y_test.cpu().numpy())

# 确保预测值不小于0
test_predictions_scaled = np.clip(test_predictions_scaled, 0, None)

# 计算残差并中心化
residuals = Y_test_scaled.flatten() - test_predictions_scaled.flatten()
residuals_centered = residuals - residuals.mean()

# Bootstrap 参数
n_bootstrap = 1000  # 重采样次数
bootstrap_predictions = []

np.random.seed(42)  # 为了结果可复现

for _ in range(n_bootstrap):
    # 从中心化的残差中有放回地采样
    sampled_residuals = np.random.choice(residuals_centered, size=len(residuals_centered), replace=True)
    # 生成新的预测样本
    bootstrap_pred = test_predictions_scaled.flatten() + sampled_residuals
    # 确保预测值不小于0
    bootstrap_pred = np.clip(bootstrap_pred, 0, None)
    bootstrap_predictions.append(bootstrap_pred)

bootstrap_predictions = np.array(bootstrap_predictions)

# 计算置信区间(例如 95%)
lower_percentile = 2.5
upper_percentile = 97.5
ci_lower = np.percentile(bootstrap_predictions, lower_percentile, axis=0)
ci_upper = np.percentile(bootstrap_predictions, upper_percentile, axis=0)

# 预测未来的步骤数(例如 30 天)
future_steps = 30
future_predictions = []
last_sequence = scaled_data[-look_back:].reshape(1, look_back, 1)
last_sequence = torch.tensor(last_sequence, dtype=torch.float32).to(device)

model.eval()
with torch.no_grad():
    for _ in range(future_steps):
        pred_scaled = model(last_sequence).cpu().numpy()
        future_predictions.append(pred_scaled.flatten()[0])
        # 更新序列:移除第一个元素,添加新的预测
        new_sequence = np.append(last_sequence.cpu().numpy().flatten()[1:], pred_scaled)
        last_sequence = torch.tensor(new_sequence.reshape(1, look_back, 1), dtype=torch.float32).to(device)

# 逆归一化
future_predictions_scaled = scaler.inverse_transform(np.array(future_predictions).reshape(-1, 1)).flatten()
future_predictions_scaled = np.clip(future_predictions_scaled, 0, None)

# Bootstrap未来预测置信区间
bootstrap_future_predictions = []

for _ in range(n_bootstrap):
    # 假设未来预测的残差分布与测试集相似
    sampled_residuals = np.random.choice(residuals_centered, size=future_steps, replace=True)
    bootstrap_future_pred = future_predictions_scaled + sampled_residuals
    # 确保预测值不小于0
    bootstrap_future_pred = np.clip(bootstrap_future_pred, 0, None)
    bootstrap_future_predictions.append(bootstrap_future_pred)

bootstrap_future_predictions = np.array(bootstrap_future_predictions)

# 计算置信区间(例如 95%)
ci_lower_future = np.percentile(bootstrap_future_predictions, lower_percentile, axis=0)
ci_upper_future = np.percentile(bootstrap_future_predictions, upper_percentile, axis=0)

# 生成未来的日期
last_date = data['日期'].iloc[-1]

# 确保 last_date 是 pandas Timestamp 对象
if not isinstance(last_date, pd.Timestamp):
    last_date = pd.to_datetime(last_date)

# 使用 start 和 periods 生成未来的日期,不使用 closed 参数
future_dates = pd.date_range(start=last_date + pd.Timedelta(days=1), periods=future_steps)


4.4 结果制图


# 可视化训练集曲线、测试集预测与置信区间、以及未来预测与置信区间
plt.figure(figsize=(14, 7))

# 绘制训练集曲线
plt.plot(data['日期'][:train_size + look_back], scaler.inverse_transform(scaled_data[:train_size + look_back]), label='训练集', color='gray', linestyle='--')

# 绘制测试集预测值和实际值
plt.plot(data['日期'].iloc[train_size + look_back:], Y_test_scaled, label='实际值', color='blue')
plt.plot(data['日期'].iloc[train_size + look_back:], test_predictions_scaled, label='预测值', color='red')
plt.fill_between(data['日期'].iloc[train_size + look_back:], ci_lower, ci_upper, color='pink', alpha=0.5, label='95% 置信区间')

# 绘制未来预测值和置信区间
plt.plot(future_dates, future_predictions_scaled, label='未来预测值', color='green')
plt.fill_between(future_dates, ci_lower_future, ci_upper_future, color='lightgreen', alpha=0.5, label='95% 置信区间')

# 添加图例、标题和标签
plt.xlabel('日期')
plt.ylabel('发病数')
plt.title('某病LSTM预测并用bootstrap增加置信区间')
plt.legend()

# 显示图像
plt.show()

4.5 模型评价指标与残差分析可视化


# 计算RMSE
rmse = np.sqrt(mean_squared_error(Y_test_scaled, test_predictions_scaled))
print(f'RMSE: {rmse}')

# 计算MAE
mae = mean_absolute_error(Y_test_scaled, test_predictions_scaled)
print(f'MAE: {mae}')

# 计算MAPE
mape = np.mean(np.abs((Y_test_scaled - test_predictions_scaled) / Y_test_scaled)) * 100
print(f'MAPE: {mape}%')

# 计算SMAPE
smape = np.mean(2 * np.abs(Y_test_scaled - test_predictions_scaled) / (np.abs(Y_test_scaled) + np.abs(test_predictions_scaled))) * 100
print(f'SMAPE: {smape}%')

# 绘制残差的直方图
plt.figure(figsize=(10, 6))
sns.histplot(residuals, kde=True, bins=30, color='blue', stat='density')
plt.title('Residuals Distribution')
plt.xlabel('Residuals')
plt.ylabel('Density')
plt.show()

# 进行 Shapiro-Wilk 正态性检验
stat, p_value = shapiro(residuals)
print(f'Shapiro-Wilk Test: Stat={stat}, p-value={p_value}')

# 绘制残差的自相关函数(ACF)
plt.figure(figsize=(10, 6))
plot_acf(residuals, lags=30)  # 选择lags的数量,根据需要调整
plt.title('ACF of Residuals')
plt.show()

# 残差与预测值的散点图
plt.figure(figsize=(10, 6))
plt.scatter(test_predictions_scaled.flatten(), residuals, color='blue', alpha=0.5)
plt.title('Residuals vs Predicted Values')
plt.xlabel('Predicted Values')
plt.ylabel('Residuals')
plt.show()

# 绘制 Q-Q 图
plt.figure(figsize=(6, 6))
probplot(residuals_centered, dist="norm", plot=plt)
plt.title("Q-Q 图")
plt.show()

5 结果

5.1数据预处理输出结果

png

5.2 超参数网格搜索与训练损失

Best parameters: {'batch_size': 32, 'learning_rate': 0.001, 'lstm_hidden_size': 64, 'lstm_layers': 1}
Epoch 0, Loss: 1.1413506269454956
Epoch 50, Loss: 0.16259007155895233
Epoch 100, Loss: 0.06618689000606537
Epoch 150, Loss: 0.052911654114723206
Epoch 200, Loss: 0.048041947185993195
Epoch 250, Loss: 0.04660336673259735

png

5.3 模型预测结果

png

5.4 模型评价指标与残差分析结果

5.4.1 模型评价指标

模型评价指标 RMSE: 51.99990463256836 MAE: 42.11808395385742 MAPE: 16.316121816635132% SMAPE: 14.625115692615509%

RMSE 和 MAE 都较高,这可能表明模型的整体预测能力还有待提高,本例数据本身的值较大且波动较大故仍在接受范围。 MAPE 和 SMAPE 表示中等的预测精度,16.31%和14.62%的值表明模型的相对准确性可以接受,但仍有提升空间。

5.4.2 绘制残差的直方图与正态性检验

png

Shapiro-Wilk Test: Stat=0.9668310880661011, p-value=0.13944154977798462

绘制残差的直方图,并进行正态性检验(如 Shapiro-Wilk 测试)。如果残差分布近似于正态分布,表示模型可能已经捕捉到数据的所有信息。如果 p-value 很小(通常小于 0.05),说明残差不是正态分布,可能需要改进模型。本例p-value=0.139>0.05。

5.4.3 自相关函数 (ACF) 图

png

残差的自相关性可以通过计算自相关函数(ACF)来检验。如果残差没有自相关性,那么可以认为模型的误差是随机的,不含有可预测的信息。

5.4.4 残差散点图

png

如果残差与预测值之间存在显著的模式,可能意味着模型未能完全拟合数据,存在一定的系统误差。如果散点图中残差与预测值之间存在系统性的趋势(例如,呈现某种形状),那么模型可能存在欠拟合的问题。本例散点图无明显形态。

5.4.5 Q-Q 图

png

Q-Q图进一步验证了残差的正态性。大部分点紧密分布在参考线附近,表明残差与正态分布吻合良好。这说明模型的预测误差符合正态分布的假设,有助于后续的统计分析和置信区间的构建。

5.5 讨论与改进

①拟合精度:通过RMSE、MAE、MAPE以及SMAPE等指标,可以看出模型具有一定的预测能力,但在一些极端值或振荡幅度较大的阶段,仍可能存在较大误差。

②残差分析:残差分布较为接近正态分布,且自相关程度较低,说明模型对大部分时段的拟合较好。

③Bootstrap置信区间:对预测结果的Bootstrap方法可量化未来预测的不确定性,为决策者提供风险区间。

④LSTM虽然可以处理非平稳数据,但对非平稳数据进行预处理可以得到更好的预测结果。

5.5.1模型改进方向

①更复杂的特征工程:可考虑引入气候、人口流动、卫生干预措施等外部变量。

②正则化与早停:在多层LSTM中使用Dropout或权重衰减,结合Early Stopping降低过拟合风险。

③混合模型:将LSTM与ARIMA或XGBoost等传统统计/机器学习模型相结合,以获得更高的精度。

5.5.2LSTM局限性

①LSTM模型能够捕捉时间序列中的非线性关系,但是它的能力有限,特别是在面对复杂的非线性数据时。对于某些非线性行为,模型可能无法完全捕捉到其内在的规律,导致误差。

②模型泛化能力:

过拟合:在训练过程中,如果LSTM模型过于复杂(例如过多的隐藏层或过大的隐藏层单元数),它可能会在训练集上表现得很好,但在测试集上表现较差。过拟合意味着模型学到了训练数据中的噪声,而不是数据的实际模式。

训练集和测试集的分布差异:LSTM模型假设训练集和测试集的分布相似,但在实际应用中,尤其是面对季节性疾病(如流感)的预测时,训练集和测试集的分布可能会发生变化(例如季节变化、突发事件等),从而影响模型的泛化能力。

③LSTM模型通常假设数据序列中的时间步之间存在某种程度的依赖关系,但对于某些类型的时间序列,特别是那些含有强烈外部因素(例如节假日、政策变化等)的序列,LSTM可能无法完全捕捉这些因素的影响。

胡子欣
拉夫克劳保安