Transformer模型并用Bootstrap增加置信区间

使用Python实现LSTM模型,拟合流行病发病人数


日期

Transformer模型

1 概述

1.1 模型原理

1.2 模型假设与应用

1.3 模型参数及含义

参数 含义
input_size 输入特征的维度
hidden_size 隐藏层的神经元数量
output_size 输出的维度
num_heads 多头注意力机制中头的数量
num_layers Transformer层的数量
batch_size 每批训练的数据样本数量
learning_rate 优化器的学习率
epochs 每次引导法训练的轮数
n_bootstrap 引导法的迭代次数

2 模型流程图及方程

2.1 模型流程图

png

2.2 Transformer模型方程

Transformer模型的核心在于自注意力机制和多头注意力(Multi-Head Attention)。以下是主要的数学方程:

2.2.1 自注意力机制(Self-Attention)

$$ \text{Attention}(Q, K, V) = \text{softmax}\left(\frac{QK^T}{\sqrt{d_k}}\right)V $$

其中:

2.2.2 多头注意力(Multi-Head Attention)

$$ \text{MultiHead}(Q, K, V) = \text{Concat}(\text{head}_1, \dots, \text{head}_h)W^O $$

其中,每个头的计算如下:

$$ \text{head}_i = \text{Attention}(QW_i^Q, KW_i^K, VW_i^V) $$

2.2.3 前馈全连接网络(Position-wise Feed-Forward Networks)

$$ \text{FFN}(x) = \text{max}(0, xW_1 + b_1)W_2 + b_2 $$

2.2.4 残差连接与层归一化(Residual Connection & Layer Normalization)

在每个子层(如多头注意力和前馈网络)之后,Transformer使用残差连接和层归一化:

$$ \text{LayerNorm}(x + \text{Sublayer}(x)) $$


3 适用疾病流行条件

Transformer模型适用于以下疾病流行条件的预测:

全局依赖性:适合捕捉长期及全局的依赖关系,尤其是当疾病传播的规律涉及多个因素的复杂相互作用时。 大规模数据集:对于大规模的时间序列数据,Transformer能够高效地进行训练,并处理数据中的长程依赖。 并行计算需求高:当计算资源允许时,Transformer的并行性能够显著提高训练效率,适合大数据量的疾病预测任务。


4 代码(Python)

4.1 条件设定

在开始建模之前,需要配置环境,导入必要的库。

import torch
import torch.nn as nn
import pandas as pd
import numpy as np
import matplotlib.pyplot as plt
from sklearn.preprocessing import StandardScaler
from sklearn.metrics import mean_squared_error
from sklearn.model_selection import ParameterGrid
import matplotlib
import seaborn as sns
from sklearn.metrics import mean_squared_error, mean_absolute_error
from scipy.stats import shapiro
from statsmodels.graphics.tsaplots import plot_acf
from scipy.stats import probplot
from statsmodels.tsa.stattools import adfuller

# 设置中文字体
matplotlib.rcParams['font.sans-serif'] = ['Microsoft YaHei']  # Windows下的常见中文字体
matplotlib.rcParams['axes.unicode_minus'] = False  # 正常显示负号

4.2 数据预处理


# 数据预处理
data = pd.read_excel('./daily_flu_counts.xlsx')
data = data[['日期', '发病数']].sort_values('日期').fillna(0)


# 使用线性插值填充缺失值
data['发病数'] = data['发病数'].interpolate(method='linear')
# 确保数据非负
data['发病数'] = data['发病数'].clip(lower=0)
# 确保数据非负
data['发病数'] = data['发病数'].clip(lower=0)
# 确保 '日期' 列为 datetime 类型
data['日期'] = pd.to_datetime(data['日期'])

# 绘制原数据的曲线图
plt.figure(figsize=(12, 6))
plt.plot(data['日期'], data['发病数'], label='发病数', color='blue')


plt.title('每日发病数曲线图')
plt.xlabel('日期')
plt.ylabel('发病数')
plt.legend()
plt.grid(True)
plt.xticks(rotation=45)
plt.show()

4.3 模型训练


# 检查是否有可用的 GPU
device = torch.device('cuda:0' if torch.cuda.is_available() else 'cpu')
print(f'Using device: {device}')

# 读取数据
data = pd.read_excel('./daily_flu_counts.xlsx')
data = data[['日期', '发病数']].sort_values('日期').fillna(0)

# 标准化数据
scaler = StandardScaler()
scaled_data = scaler.fit_transform(data['发病数'].values.reshape(-1, 1))

# 创建时间序列数据集
def create_dataset(dataset, look_back=1):
    X, Y = [], []
    for i in range(len(dataset) - look_back):
        X.append(dataset[i:(i + look_back), 0])
        Y.append(dataset[i + look_back, 0])
    return np.array(X), np.array(Y)

look_back = 30  # 滑动窗口长度
X, Y = create_dataset(scaled_data, look_back)

# 拆分训练集和测试集
train_size = int(len(X) * 0.8)
X_train, X_test = X[:train_size], X[train_size:]
Y_train, Y_test = Y[:train_size], Y[train_size:]

# 转换为 PyTorch 张量
X_train = torch.tensor(X_train, dtype=torch.float32).view(-1, look_back, 1).to(device)
X_test = torch.tensor(X_test, dtype=torch.float32).view(-1, look_back, 1).to(device)
Y_train = torch.tensor(Y_train, dtype=torch.float32).view(-1, 1).to(device)
Y_test = torch.tensor(Y_test, dtype=torch.float32).view(-1, 1).to(device)

# 定义位置编码
class PositionalEncoding(nn.Module):
    def __init__(self, d_model, max_len=5000):
        super(PositionalEncoding, self).__init__()
        
        pe = torch.zeros(max_len, d_model)
        position = torch.arange(0, max_len, dtype=torch.float).unsqueeze(1)
        div_term = torch.exp(torch.arange(0, d_model, 2).float() * (-np.log(10000.0) / d_model))
        
        pe[:, 0::2] = torch.sin(position * div_term)
        pe[:, 1::2] = torch.cos(position * div_term)
        
        pe = pe.unsqueeze(0)  # [1, max_len, d_model]
        self.register_buffer('pe', pe)
    
    def forward(self, x):
        """
        x: Tensor, shape [batch_size, seq_len, d_model]
        """
        x = x + self.pe[:, :x.size(1), :]
        return x

# 定义 Transformer 模型
class EnhancedTransformerModel(nn.Module):
    def __init__(self, input_size=1, d_model=64, nhead=8, num_encoder_layers=3, dim_feedforward=256, output_size=1, dropout=0.1):
        super(EnhancedTransformerModel, self).__init__()
        self.input_linear = nn.Linear(input_size, d_model)
        self.positional_encoding = PositionalEncoding(d_model)
        
        encoder_layer = nn.TransformerEncoderLayer(d_model=d_model, nhead=nhead, 
                                                   dim_feedforward=dim_feedforward, dropout=dropout)
        self.transformer_encoder = nn.TransformerEncoder(encoder_layer, num_layers=num_encoder_layers)
        
        self.fc1 = nn.Linear(d_model, 64)
        self.relu = nn.ReLU()
        self.fc2 = nn.Linear(64, output_size)
    
    def forward(self, x):
        """
        x: Tensor, shape [batch_size, seq_len, input_size]
        """
        x = self.input_linear(x)  # [batch_size, seq_len, d_model]
        x = self.positional_encoding(x)  # 添加位置编码
        x = x.transpose(0, 1)  # [seq_len, batch_size, d_model]
        x = self.transformer_encoder(x)  # [seq_len, batch_size, d_model]
        x = x.mean(dim=0)  # [batch_size, d_model]
        x = self.fc1(x)
        x = self.relu(x)
        x = self.fc2(x)
        return x

# 超参数搜索的网格(移除 batch_size)
param_grid = {
    'd_model': [64, 128],
    'nhead': [4, 8],
    'num_encoder_layers': [2, 3],
    'dim_feedforward': [256, 512],
    'learning_rate': [0.0001, 0.0005, 0.001]
}

# 评估函数(移除 batch_size)
def evaluate_model(d_model, nhead, num_encoder_layers, dim_feedforward, learning_rate):
    # 初始化模型
    model = EnhancedTransformerModel(
        input_size=1, 
        d_model=d_model, 
        nhead=nhead, 
        num_encoder_layers=num_encoder_layers, 
        dim_feedforward=dim_feedforward
    ).to(device)
    loss_function = nn.MSELoss()
    optimizer = torch.optim.Adam(model.parameters(), lr=learning_rate)

    # 训练模型
    epochs = 100
    model.train()
    for epoch in range(epochs):
        optimizer.zero_grad()
        y_pred = model(X_train)
        loss = loss_function(y_pred, Y_train)
        loss.backward()
        optimizer.step()

    # 预测和计算损失
    model.eval()
    with torch.no_grad():
        test_predictions = model(X_test).cpu().numpy()
    test_predictions_scaled = scaler.inverse_transform(test_predictions)
    Y_test_scaled = scaler.inverse_transform(Y_test.cpu().numpy())
    mse = mean_squared_error(Y_test_scaled, test_predictions_scaled)
    
    return mse

# 网格搜索
best_params = None
best_mse = float('inf')

for params in ParameterGrid(param_grid):
    print(f'Evaluating with parameters: {params}')
    mse = evaluate_model(**params)
    print(f'MSE: {mse}')
    
    if mse < best_mse:
        best_mse = mse
        best_params = params

print(f'Best parameters: {best_params}')
print(f'Best MSE: {best_mse}')

# 使用最佳参数实例化模型
model = EnhancedTransformerModel(
    input_size=1, 
    d_model=best_params['d_model'], 
    nhead=best_params['nhead'], 
    num_encoder_layers=best_params['num_encoder_layers'], 
    dim_feedforward=best_params['dim_feedforward']
).to(device)

# 定义损失函数和优化器
loss_function = nn.MSELoss()
optimizer = torch.optim.Adam(model.parameters(), lr=best_params['learning_rate'])

# 训练模型
epochs = 500
train_losses = []
for epoch in range(epochs):
    model.train()
    optimizer.zero_grad()
    y_pred = model(X_train)
    loss = loss_function(y_pred, Y_train)
    loss.backward()
    optimizer.step()

    train_losses.append(loss.item())
    if epoch % 50 == 0:
        print(f'Epoch {epoch}, Loss: {loss.item()}')

# 绘制训练损失
plt.plot(train_losses, label="训练损失")
plt.xlabel("Epochs")
plt.ylabel("Loss")
plt.legend()
plt.show()

# 测试集预测
model.eval()
with torch.no_grad():
    test_predictions = model(X_test).cpu().numpy()

# 逆归一化
test_predictions_scaled = scaler.inverse_transform(test_predictions)
Y_test_scaled = scaler.inverse_transform(Y_test.cpu().numpy())

# 确保预测值不小于0
test_predictions_scaled = np.clip(test_predictions_scaled, 0, None)

# 计算残差并中心化
residuals = Y_test_scaled.flatten() - test_predictions_scaled.flatten()
residuals_centered = residuals - residuals.mean()

# Bootstrap参数
n_bootstrap = 1000  # 重采样次数
bootstrap_predictions = []

np.random.seed(42)  # 为了结果可复现

for _ in range(n_bootstrap):
    # 从中心化的残差中有放回地采样
    sampled_residuals = np.random.choice(residuals_centered, size=len(residuals_centered), replace=True)
    # 生成新的预测样本
    bootstrap_pred = test_predictions_scaled.flatten() + sampled_residuals
    # 确保预测值不小于0
    bootstrap_pred = np.clip(bootstrap_pred, 0, None)
    bootstrap_predictions.append(bootstrap_pred)

bootstrap_predictions = np.array(bootstrap_predictions)

# 计算置信区间(例如 95%)
lower_percentile = 2.5
upper_percentile = 97.5
ci_lower = np.percentile(bootstrap_predictions, lower_percentile, axis=0)
ci_upper = np.percentile(bootstrap_predictions, upper_percentile, axis=0)


# 预测未来的步骤数(例如 30 天)
future_steps = 30
future_predictions = []
last_sequence = scaled_data[-look_back:].reshape(1, look_back, 1)
last_sequence = torch.tensor(last_sequence, dtype=torch.float32).to(device)

model.eval()
with torch.no_grad():
    for _ in range(future_steps):
        pred_scaled = model(last_sequence).cpu().numpy()
        future_predictions.append(pred_scaled.flatten()[0])
        # 更新序列:移除第一个元素,添加新的预测
        new_sequence = np.append(last_sequence.cpu().numpy().flatten()[1:], pred_scaled)
        last_sequence = torch.tensor(new_sequence.reshape(1, look_back, 1), dtype=torch.float32).to(device)

# 逆归一化
future_predictions_scaled = scaler.inverse_transform(np.array(future_predictions).reshape(-1, 1)).flatten()
future_predictions_scaled = np.clip(future_predictions_scaled, 0, None)

# Bootstrap 未来预测置信区间
bootstrap_future_predictions = []

for _ in range(n_bootstrap):
    # 假设未来预测的残差分布与测试集相似
    sampled_residuals = np.random.choice(residuals_centered, size=future_steps, replace=True)
    bootstrap_future_pred = future_predictions_scaled + sampled_residuals
    # 确保预测值不小于0
    bootstrap_future_pred = np.clip(bootstrap_future_pred, 0, None)
    bootstrap_future_predictions.append(bootstrap_future_pred)

bootstrap_future_predictions = np.array(bootstrap_future_predictions)

# 计算置信区间(例如 95%)
ci_lower_future = np.percentile(bootstrap_future_predictions, lower_percentile, axis=0)
ci_upper_future = np.percentile(bootstrap_future_predictions, upper_percentile, axis=0)

# 生成未来的日期
last_date = data['日期'].iloc[-1]

# 确保 last_date 是 pandas Timestamp 对象
if not isinstance(last_date, pd.Timestamp):
    last_date = pd.to_datetime(last_date)

# 使用 start 和 periods 生成未来的日期,不使用 closed 参数
future_dates = pd.date_range(start=last_date + pd.Timedelta(days=1), periods=future_steps)


4.4 结果制图


# 可视化结果
plt.figure(figsize=(12,6))
plt.plot(data['日期'], data['发病数'], label='历史数据')
plt.plot(data['日期'].iloc[-len(Y_test):], Y_test_scaled, label='实际值', color='green')
plt.plot(data['日期'].iloc[-len(Y_test):], test_predictions_scaled, label='预测值', color='red')
plt.fill_between(data['日期'].iloc[-len(Y_test):], ci_lower, ci_upper, color='orange', alpha=0.3, label='95% 置信区间')
plt.plot(future_dates, future_predictions_scaled, label='未来预测', color='blue')
plt.fill_between(future_dates, ci_lower_future, ci_upper_future, color='lightblue', alpha=0.3, label='95% 置信区间')
plt.xlabel('日期')
plt.ylabel('发病数')
plt.legend()
plt.show()

4.5 模型评价指标与残差分析可视化

rmse = np.sqrt(mean_squared_error(Y_test_scaled, test_predictions_scaled))
mae = mean_absolute_error(Y_test_scaled, test_predictions_scaled)
mape = np.mean(np.abs((Y_test_scaled - test_predictions_scaled) / Y_test_scaled)) * 100
smape = np.mean(2 * np.abs(Y_test_scaled - test_predictions_scaled) / (np.abs(Y_test_scaled) + np.abs(test_predictions_scaled))) * 100

# 打印结果
print(f"RMSE: {rmse:.4f}")
print(f"MAE: {mae:.4f}")
print(f"MAPE: {mape:.2f}%")
print(f"SMAPE: {smape:.2f}%")
# 绘制残差的直方图
plt.figure(figsize=(10, 6))
sns.histplot(residuals, kde=True, bins=30, color='blue', stat='density')
plt.title('Residuals Distribution')
plt.xlabel('Residuals')
plt.ylabel('Density')
plt.show()

# 进行 Shapiro-Wilk 正态性检验
stat, p_value = shapiro(residuals)
print(f'Shapiro-Wilk Test: Stat={stat}, p-value={p_value}')


# 绘制残差的自相关函数(ACF)
plt.figure(figsize=(10, 6))
plot_acf(residuals, lags=30)  # 选择lags的数量,根据需要调整
plt.title('ACF of Residuals')
plt.show()

# 绘制 Q-Q 图

plt.figure(figsize=(6, 6))
probplot(residuals_centered, dist="norm", plot=plt)
plt.title("Q-Q 图")
plt.show()

5 结果

5.1数据预处理

png

5.2 超参数网格搜索与训练损失

Epoch 50, Loss: 0.041903600096702576
Epoch 100, Loss: 0.03575131669640541
Epoch 150, Loss: 0.025699811056256294
Epoch 200, Loss: 0.019787732511758804
Epoch 250, Loss: 0.012308957055211067
Epoch 300, Loss: 0.010088082402944565
Epoch 350, Loss: 0.0069724190980196
Epoch 400, Loss: 0.0056532821618020535
Epoch 450, Loss: 0.0049051567912101746

png

Best parameters: {’d_model': 64, ‘dim_feedforward’: 512, ‘learning_rate’: 0.0005, ‘nhead’: 4, ‘num_encoder_layers’: 2}

5.3 模型预测结果

RMSE: 54.3682
MAE: 42.9718
MAPE: 14.77%
SMAPE: 16.44%  

png

5.4 模型评价指标与残差分析结果

5.4.1 模型评价指标

模型评价指标

RMSE: 54.3682
MAE: 42.9718
MAPE: 14.77%
SMAPE: 16.44%  

RMSE 和 MAE 都较高,这可能表明模型的整体预测能力还有待提高,本例数据本身的值较大且波动较大故仍在接受范围。 MAPE 和 SMAPE 表示中等的预测精度,14.77%和16.44% 的值表明模型的相对准确性可以接受,但仍有提升空间。

5.4.1 绘制残差的直方图与正态性检验

png

Shapiro-Wilk Test: Stat=0.9784532785415649, p-value=0.48873645067214966

绘制残差的直方图,并进行正态性检验(如 Shapiro-Wilk 测试)。如果残差分布近似于正态分布,表示模型可能已经捕捉到数据的所有信息。如果 p-value 很小(通常小于 0.05),说明残差不是正态分布,可能需要改进模型。本例p-value=0.488>0.05。

5.4.2 自相关函数 (ACF) 图

png

在理想情况下,模型的残差应该接近白噪声(即没有显著的自相关),所有滞后特征的自相关系数应该都落在置信区间内。本例绝大部分滞后特征的自相关系数都在置信区间内,仅有一个滞后特征超出,这可能是由于随机性或样本误差导致的,而不是模型本身的问题。

5.4.3 Q-Q 图

png

说明残差的大部分数据分布与正态分布较为接近,模型的假设总体上是合理的。

5.5 讨论与改进

5.5.1时间特征处理

未引入例如周、月份、季节性等的时间特征。

5.5.2正则化和早停

当前模型可能存在过拟合风险,尤其是对小样本数据集。可对其增加正则化手段(L2 正则化、增加 Dropout 层)

5.5.3模型的局限性

①高计算复杂度:当序列长度较长(例如,超过几千)时,计算注意力权重矩阵会消耗大量内存,并显著增加训练时间。 ②Transformer 模型在时间序列预测中表现较好,但通常更适用于多维时间序列或具有复杂依赖关系的数据。如果数据相对简单(单变量时间序列),复杂的 Transformer 架构可能导致过拟合。 ③时间序列通常有明确的因果关系(即过去影响未来),而 Transformer 模型本质上是同时处理输入序列的所有时间步,并不直接捕捉因果性。 ④可解释性问题:Transformer 模型的注意力机制可以提供一定的可解释性(如观察哪些输入时间步对预测贡献更大),但相比于传统模型(如 ARIMA 或简单线性回归)的参数显著性,Transformer 的可解释性仍较弱。

胡子欣
拉夫克劳保安