GRU模型

使用Python实现GRU模型,预测ILI+


日期

GRU模型

下载讲解视频

1 概述

1.1 模型假设

1.2 模型描述

GRU(门控循环单元)是一种循环神经网络(RNN)的变体,旨在解决传统RNN在处理长序列数据时的梯度消失问题。GRU通过引入重置门和更新门,有效地捕捉时间序列数据中的长期依赖关系,适用于时间序列预测任务,如流感阳性率的预测。

1.3 模型参数及含义

参数 含义
input_size 输入特征的维度
hidden_size GRU隐藏层的神经元数量
output_size 输出的维度
num_layers GRU层的数量
sequence_length 输入序列的长度(时间步数)
batch_size 每批训练的数据样本数量
learning_rate 优化器的学习率
epochs 每次引导法训练的轮数
n_bootstrap 引导法的迭代次数
fold 交叉验证的折数

2 模型流程图及方程

2.1 模型流程图

GRU模型流程图

2.2 GRU模型方程

GRU模型的核心由以下几个门控机制组成:

  1. 更新门(Update Gate): $$ z_t = \sigma(W_z \cdot [h_{t-1}, x_t]) $$
  2. 重置门(Reset Gate): $$ r_t = \sigma(W_r \cdot [h_{t-1}, x_t]) $$
  3. 候选隐藏状态(Candidate Activation): $$ h_t = \tanh(W \cdot [r_t * h_{t-1}, x_t]) $$
  4. 最终隐藏状态(Final Activation): $$ h_t = (1 - z_t) * h_{t-1} + z_t * \tilde{h}_t $$

其中,($\sigma$) 表示sigmoid激活函数,($\tanh$) 表示双曲正切激活函数,($x_t$) 为当前输入,($h_{t-1}$) 为前一时间步的隐藏状态。

3 适用疾病流行条件

GRU模型适用于以下疾病流行条件的预测:

4 代码(Python)

4.1 条件设定

在开始建模之前,需要配置环境,导入必要的库。

import pandas as pd
import matplotlib.pyplot as plt
from statsmodels.graphics.tsaplots import plot_acf, plot_pacf
from statsmodels.tsa.stattools import adfuller
from statsmodels.sandbox.stats.runs import runstest_1samp
import seaborn as sns
import numpy as np
from matplotlib import rcParams
from scipy.stats import skew, kurtosis
import torch
import torch.nn as nn
import torch.optim as optim
from torch.utils.data import DataLoader, TensorDataset
from sklearn.preprocessing import MinMaxScaler
from sklearn.model_selection import TimeSeriesSplit
from sklearn.metrics import mean_absolute_error, mean_squared_error
from statsmodels.graphics.gofplots import qqplot

# 配置 Matplotlib 中文字体支持,避免中文显示问题
plt.rcParams['font.sans-serif'] = ['SimHei']  # 替换为本地的中文字体,如 "SimSun" 或 "SimHei"
plt.rcParams['axes.unicode_minus'] = False   # 避免负号显示问题

4.2 检查数据

import pandas as pd
import matplotlib.pyplot as plt
from statsmodels.graphics.tsaplots import plot_acf, plot_pacf
from statsmodels.tsa.stattools import adfuller
from statsmodels.sandbox.stats.runs import runstest_1samp
import seaborn as sns
import numpy as np
from matplotlib import rcParams

# 配置 Matplotlib 中文字体支持,避免中文显示问题
plt.rcParams['font.sans-serif'] = ['SimHei']  # 替换为本地的中文字体,如 "SimSun" 或 "SimHei"
plt.rcParams['axes.unicode_minus'] = False   # 避免负号显示问题

# 1. 数据导入
file_path = 'flu_positive.xlsx'  
sheet_name = 'Positive Rate'  

# 加载数据
df = pd.read_excel(file_path, sheet_name=sheet_name)

# 检查数据
print("数据预览:")
display(df.head())

# 确保目标列存在,提取目标列数据
if 'ILI+' not in df.columns:
    raise ValueError("目标列 'ILI+' 不存在,请检查数据文件!")

# 提取目标列数据
data_values = df['ILI+'].dropna().values  # 删除缺失值,确保数据完整



# 绘制时间序列图
plt.figure(figsize=(12, 6))
plt.plot(data_values,alpha=0.7)
plt.title("时间序列数据可视化")
plt.xlabel("时间步")
plt.ylabel("ILI+ 值")
plt.grid()
plt.show()


#  平稳性检验(ADF检验)
adf_result = adfuller(data_values)
print("\nADF(Augmented Dickey-Fuller)单位根检验结果:")
print(f"ADF 统计量: {adf_result[0]:.3f}")
print(f"p 值: {adf_result[1]:.3f}")
print("临界值:")
for key, value in adf_result[4].items():
    print(f"  {key}: {value:.3f}")
if adf_result[1] < 0.05:
    print("结论: 数据是平稳的,可以直接用于时间序列建模。")
else:
    print("结论: 数据是非平稳的,建议进行差分或对数变换。")

4.3 GRU model

import pandas as pd
import numpy as np
import matplotlib.pyplot as plt
import seaborn as sns
from sklearn.preprocessing import MinMaxScaler
import torch
import torch.nn as nn
import torch.optim as optim
from torch.utils.data import DataLoader, TensorDataset
from sklearn.model_selection import TimeSeriesSplit
from sklearn.metrics import mean_absolute_error, mean_squared_error
from statsmodels.graphics.gofplots import qqplot
from statsmodels.graphics.tsaplots import plot_acf

# 数据导入
file_path = 'flu_positive.xlsx'
data = pd.ExcelFile(file_path)
df = data.parse('Positive Rate')

# 提取目标列 "ILI+" 并标准化数据
scaler = MinMaxScaler()
df['num_scaled'] = scaler.fit_transform(df[['ILI+']])

# 创建时间序列数据 (设定序列长度)
sequence_length = 30

def create_sequences(data, sequence_length):
    X, y = [], []
    for i in range(sequence_length, len(data)):
        X.append(data[i - sequence_length:i])
        y.append(data[i])
    return np.array(X), np.array(y)

# 准备序列数据
data_scaled = df['num_scaled'].values
X, y = create_sequences(data_scaled, sequence_length)

# 创建对应的日期
dates = df['date'].iloc[sequence_length:].reset_index(drop=True)

# 检查 GPU 是否可用
device = torch.device("cuda" if torch.cuda.is_available() else "cpu")
print(f"Using device: {device}")

# 定义 GRU 模型
class GRUModel(nn.Module):
    def __init__(self, input_size, hidden_size, output_size, num_layers):
        super(GRUModel, self).__init__()
        self.gru = nn.GRU(input_size, hidden_size, num_layers, batch_first=True)
        self.fc = nn.Linear(hidden_size, output_size)
        self.activation = nn.Softplus()  # 添加 Softplus 激活函数

    def forward(self, x):
        h0 = torch.zeros(self.gru.num_layers, x.size(0), self.gru.hidden_size).to(device)
        out, _ = self.gru(x, h0)
        out = self.fc(out[:, -1, :])  # 取最后一个时间步的输出
        out = self.activation(out)  # 确保输出非负
        return out

# 自定义损失函数,加入对负值的惩罚
def custom_loss_function(outputs, targets):
    mse_loss = nn.MSELoss()(outputs, targets)
    penalty = torch.mean(torch.relu(-outputs))  # 对负值部分进行惩罚
    return mse_loss + penalty * 10  # 10 是惩罚系数,可以调整

# 定义 GRU 模型参数
input_size = 1
hidden_size = 128
output_size = 1
num_layers = 2

# 定义 Bootstrap 训练函数
def bootstrap_train_model(base_model, train_loader, criterion, optimizer_fn, n_bootstrap=100, epochs_per_bootstrap=50, noise_scale=0.4):
    all_results = {
        "train_predictions": [],
        "test_predictions": [],
        "future_predictions": []
    }
    for i in range(n_bootstrap):
        print(f"Bootstrap iteration {i+1}/{n_bootstrap}")
        # 初始化模型和优化器
        model = GRUModel(input_size, hidden_size, output_size, num_layers).to(device)
        optimizer = optimizer_fn(model.parameters())

        model.train()
        for epoch in range(epochs_per_bootstrap):
            epoch_loss = 0
            # 动态调整学习率
            if epoch == 350:
                for param_group in optimizer.param_groups:
                    param_group['lr'] = 0.001

            for inputs, targets in train_loader:
                inputs = inputs.unsqueeze(-1).to(device)
                targets = targets.unsqueeze(-1).to(device)

                # 添加噪声到输入
                noisy_inputs = inputs + torch.randn_like(inputs) * noise_scale

                # 前向传播
                outputs = model(noisy_inputs)
                loss = criterion(outputs, targets)

                # 反向传播和优化
                optimizer.zero_grad()
                loss.backward()
                optimizer.step()

                epoch_loss += loss.item()

            if (epoch + 1) % 100 == 0:
                print(f"Epoch [{epoch + 1}/{epochs_per_bootstrap}], Loss: {epoch_loss / len(train_loader):.4f}")

        # 保存训练集预测
        model.eval()
        with torch.no_grad():
            train_preds = model(torch.tensor(X_train, dtype=torch.float32).unsqueeze(-1).to(device)).cpu().numpy()
            test_preds = model(torch.tensor(X_test, dtype=torch.float32).unsqueeze(-1).to(device)).cpu().numpy()

            # 获取未来预测
            future_preds = []
            current_sequence = data_scaled[-sequence_length:]
            for _ in range(21):
                input_tensor = torch.tensor(current_sequence, dtype=torch.float32).unsqueeze(0).unsqueeze(-1).to(device)
                next_pred = model(input_tensor).cpu().numpy().flatten()
                future_preds.append(next_pred[0])
                current_sequence = np.append(current_sequence[1:], next_pred)

        all_results["train_predictions"].append(train_preds)
        all_results["test_predictions"].append(test_preds)
        all_results["future_predictions"].append(future_preds)

    return all_results

# 计算均值和置信区间
def calculate_mean_and_ci(results):
    results = np.array(results)  # 转为 NumPy 数组 (n_bootstrap, n_samples)
    mean_preds = results.mean(axis=0)
    lower_bound = np.percentile(results, 2.5, axis=0)
    upper_bound = np.percentile(results, 97.5, axis=0)
    return mean_preds, lower_bound, upper_bound

# 创建存储指标的 DataFrame
metrics_df = pd.DataFrame(columns=['Fold', 'MAE', 'RMSE', 'MAPE'])

# 创建一个列表来存储所有残差、预测值和真实值
all_residuals = []
all_predictions = []
all_true_values = []

# 定义交叉验证 (3 折)
ts_split = TimeSeriesSplit(n_splits=4)
fold = 1
for train_index, test_index in ts_split.split(X):
    print(f"Fold {fold}...")

    # 划分训练集和测试集
    X_train, X_test = X[train_index], X[test_index]
    y_train, y_test = y[train_index], y[test_index]

    # 转换为 PyTorch 张量
    X_train_tensor = torch.tensor(X_train, dtype=torch.float32)
    X_test_tensor = torch.tensor(X_test, dtype=torch.float32)
    y_train_tensor = torch.tensor(y_train, dtype=torch.float32)
    y_test_tensor = torch.tensor(y_test, dtype=torch.float32)

    # 创建数据加载器
    train_dataset = TensorDataset(X_train_tensor, y_train_tensor)
    test_dataset = TensorDataset(X_test_tensor, y_test_tensor)
    train_loader = DataLoader(train_dataset, batch_size=32, shuffle=False)
    test_loader = DataLoader(test_dataset, batch_size=32, shuffle=False)

    # 训练 Bootstrap 模型
    bootstrap_results = bootstrap_train_model(
        GRUModel, train_loader, custom_loss_function, 
        optimizer_fn=lambda params: optim.Adam(params, lr=0.005), 
        n_bootstrap=10, epochs_per_bootstrap=500, noise_scale=0.2
    )

    # 计算训练集、测试集、未来预测的均值和置信区间
    train_mean = np.mean(bootstrap_results["train_predictions"], axis=0)
    test_mean, test_lower, test_upper = calculate_mean_and_ci(bootstrap_results["test_predictions"])
    future_mean, future_lower, future_upper = calculate_mean_and_ci(bootstrap_results["future_predictions"])

    # 将训练集和测试集预测值反归一化
    train_mean_rescaled = scaler.inverse_transform(train_mean.reshape(-1, 1))
    test_mean_rescaled = scaler.inverse_transform(test_mean.reshape(-1, 1))
    test_lower_rescaled = scaler.inverse_transform(test_lower.reshape(-1, 1))
    test_upper_rescaled = scaler.inverse_transform(test_upper.reshape(-1, 1))
    future_mean_rescaled = scaler.inverse_transform(future_mean.reshape(-1, 1))
    future_lower_rescaled = scaler.inverse_transform(future_lower.reshape(-1, 1))
    future_upper_rescaled = scaler.inverse_transform(future_upper.reshape(-1, 1))

# 计算误差指标
    epsilon = 1e-6  # 防止分母为零的平滑常数
    y_test_rescaled = scaler.inverse_transform(y_test.reshape(-1, 1))
    mae = mean_absolute_error(y_test_rescaled, test_mean_rescaled)
    rmse = mean_squared_error(y_test_rescaled, test_mean_rescaled, squared=False)
    mape = np.mean(np.abs((y_test_rescaled - test_mean_rescaled) / (y_test_rescaled + epsilon))) * 100
    smape = np.mean(2 * np.abs(y_test_rescaled - test_mean_rescaled) / 
                (np.abs(y_test_rescaled) + np.abs(test_mean_rescaled) + epsilon)) * 100

    print(f"Fold {fold} Metrics:")
    print(f"MAE: {mae:.4f}")
    print(f"RMSE: {rmse:.4f}")
    print(f"MAPE: {mape:.2f}%")
    print(f"SMAPE: {smape:.2f}%")

# 将指标写入 DataFrame
    new_row = pd.DataFrame({
        'Fold': [fold],
        'MAE': [mae],
        'RMSE': [rmse],
        'MAPE': [mape],
        'SMAPE': [smape]
    })
    metrics_df = pd.concat([metrics_df, new_row], ignore_index=True)

    # 计算残差
    residuals = y_test_rescaled.flatten() - test_mean_rescaled.flatten()

    # 将残差添加到 all_residuals 列表中
    all_residuals.extend(residuals)

    # 存储预测值和真实值
    all_predictions.extend(test_mean_rescaled.flatten())
    all_true_values.extend(y_test_rescaled.flatten())

    # 创建未来日期范围
    last_date = pd.to_datetime(dates.iloc[-1])
    future_dates = [last_date + pd.Timedelta(days=i+1) for i in range(21)]

    # 可视化结果 (包含训练集、测试集和未来预测)
    plt.figure(figsize=(15, 8))

    # 绘制所有真实值
    plt.plot(df['date'], df['ILI+'], label='True Values (All)', color='blue')

    # 绘制训练集预测值
    plt.plot(dates.iloc[train_index], train_mean_rescaled, label='Train Predictions', color='green')

    # 绘制测试集预测值及置信区间
    plt.plot(dates.iloc[test_index], test_mean_rescaled, label='Test Predictions', color='red')
    plt.fill_between(dates.iloc[test_index], test_lower_rescaled.flatten(), test_upper_rescaled.flatten(), color='gray', alpha=0.3, label='95% Confidence Interval (Test)')

    # 绘制未来预测值及置信区间
    plt.plot(future_dates, future_mean_rescaled, label='Future Predictions (Next 21 Days)', color='orange', linestyle='--')
    plt.fill_between(future_dates, future_lower_rescaled.flatten(), future_upper_rescaled.flatten(), color='yellow', alpha=0.3, label='95% Confidence Interval (Future)')

    plt.title(f'Bootstrap GRU Model Predictions for Fold {fold}')
    plt.xlabel('Date')
    plt.ylabel('ILI+')
    plt.legend()
    plt.xticks(rotation=45)
    plt.tight_layout()
    plt.show()

    fold += 1

# 可视化各折叠的 MAE、RMSE、MAPE 和 SMAPE
fig, axes = plt.subplots(1, 4, figsize=(40, 5))

# MAE 可视化
axes[0].plot(metrics_df['Fold'], metrics_df['MAE'], marker='o', label='MAE')
axes[0].set_title('MAE Across Folds')
axes[0].set_xlabel('Fold')
axes[0].set_ylabel('MAE')
axes[0].legend()

# RMSE 可视化
axes[1].plot(metrics_df['Fold'], metrics_df['RMSE'], marker='o', label='RMSE', color='orange')
axes[1].set_title('RMSE Across Folds')
axes[1].set_xlabel('Fold')
axes[1].set_ylabel('RMSE')
axes[1].legend()

# MAPE 可视化
axes[2].plot(metrics_df['Fold'], metrics_df['MAPE'], marker='o', label='MAPE', color='green')
axes[2].set_title('MAPE Across Folds')
axes[2].set_xlabel('Fold')
axes[2].set_ylabel('MAPE (%)')
axes[2].legend()

# SMAPE 可视化
axes[3].plot(metrics_df['Fold'], metrics_df['SMAPE'], marker='o', label='SMAPE', color='purple')
axes[3].set_title('SMAPE Across Folds')
axes[3].set_xlabel('Fold')
axes[3].set_ylabel('SMAPE (%)')
axes[3].legend()

plt.tight_layout()
plt.show()

4.4 残差分析和可视化

# 残差分析和可视化
all_residuals = np.array(all_residuals)
all_predictions = np.array(all_predictions)
all_true_values = np.array(all_true_values)

# 计算基本统计量
residual_mean = np.mean(all_residuals)
residual_std = np.std(all_residuals)
print(f"Residual Mean: {residual_mean:.4f}")
print(f"Residual Std Dev: {residual_std:.4f}")

# 残差分布图
plt.figure(figsize=(10, 6))
sns.histplot(all_residuals, bins=50, kde=True, color='purple')
plt.title('Residuals Distribution')
plt.xlabel('Residual')
plt.ylabel('Frequency')
plt.show()



# Q-Q 图
plt.figure(figsize=(10, 6))
qqplot(all_residuals, line='s')
plt.title('Q-Q Plot of Residuals')
plt.show()

# 自相关函数 (ACF) 图
plt.figure(figsize=(10, 6))
plot_acf(all_residuals, lags=100)
plt.title('Autocorrelation of Residuals')
plt.show()

# 残差时间序列图
plt.figure(figsize=(15, 6))
plt.plot(all_true_values, label='True Values', color='blue')
plt.plot(all_predictions, label='Predicted Values', color='red')
plt.scatter(range(len(all_residuals)), all_residuals, label='Residuals', color='green', alpha=0.5)
plt.axhline(0, color='black', linestyle='--')
plt.title('Residuals Over Time')
plt.xlabel('Time')
plt.ylabel('Values')
plt.legend()
plt.show()

4.5 输出结果

4.5.1 数据检查

数据预览:

date Positive ILI ILI+ num
0 2022-12-12 1.114458 2.850183 0.000318 447768.837978
1 2022-12-13 0.557229 2.928571 0.000163 230041.887909
2 2022-12-14 0.575803 2.928571 0.000169 237709.950839
3 2022-12-15 0.594378 2.957967 0.000176 247841.001337
4 2022-12-16 0.501506 2.967766 0.000149 209808.560132

png

ADF(Augmented Dickey-Fuller)单位根检验结果:
ADF 统计量: -2.975
p 值: 0.037
临界值:
  1%: -3.440
  5%: -2.866
  10%: -2.569
结论: 数据是平稳的,可以直接用于时间序列建模。

4.5.2 第一次交叉验证

Fold 1 :
MAE: 0.0005
RMSE: 0.0006
MAPE: 64.83%
SMAPE: 42.29%

png

4.5.3 第二次交叉验证

Fold 2 :
MAE: 0.0068
RMSE: 0.0075
MAPE: 39.03%
SMAPE: 39.98%

png

4.5.4 第三次交叉验证

Fold 3 :
MAE: 0.0027
RMSE: 0.0039
MAPE: 41.93%
SMAPE: 38.34%

png

4.5.5 第四次交叉验证

Fold 4 :
MAE: 0.0008
RMSE: 0.0008
MAPE: 77.77%
SMAPE: 48.91%

png

4.5.6 MAE,RMSE,MAPE,SMAPE

png

4.5.7 基本统计量

残差均值接近于零,表明模型的预测结果在整体上是无偏的。残差标准差较小,表明模型预测的误差较低,预测结果较为准确。

4.5.8 残差分布图

Residuals Distribution

残差分布图显示残差近似服从正态分布,且集中在零附近。这表明模型的误差主要集中在预测值附近,没有明显的偏态或异常值,模型的预测性能稳定。

4.5.9 Q-Q 图

Q-Q Plot

Q-Q图进一步验证了残差的正态性。大部分点紧密分布在参考线附近,表明残差与正态分布吻合良好。这说明模型的预测误差符合正态分布的假设,有助于后续的统计分析和置信区间的构建。

4.5.10 自相关函数 (ACF) 图

Autocorrelation of Residuals

自相关函数图显示残差在各个滞后期的自相关性较低,未出现显著的自相关性。这表明模型已有效捕捉了数据中的时间依赖性,残差中的信息主要为随机噪声,模型的预测性能良好。

4.5.11 残差时间序列图

Residuals Over Time

残差时间序列图展示了残差随时间的变化情况。残差随机分布在零线附近,没有明显的趋势或周期性波动,进一步证明了模型预测的准确性和稳定性。

4.5.12 残差综合分析

通过上述残差分析,可以得出以下结论:

综上所述,GRU模型在预测流感阳性率方面表现出色,残差分析验证了模型的准确性和可靠性。模型的预测误差小且随机分布,适用于实际应用中的时间序列预测任务。

5 扩展

在保持GRU模型基本结构不变的情况下,可以进行以下扩展和改进,以提升模型的性能和适用性。同时,了解GRU模型的局限性有助于在实际应用中做出更明智的选择和调整。

5.1 模型优化

5.1.1 调整超参数

5.1.2 正则化技术

5.1.3 早停法(Early Stopping)

在训练过程中监控验证集的性能,当验证误差不再下降时,提前停止训练,防止模型过拟合。

5.2 特征工程

5.2.1 引入额外特征

除了"ILI+",可以引入其他相关指标,如气温、湿度、人口密度等,以提供更多信息,提升模型的预测能力。

5.2.2 时间特征提取

提取日期相关的特征,如月份、星期几、节假日等,帮助模型捕捉季节性和周期性模式。

5.3 模型集成

结合其他时间序列预测模型(如LSTM、ARIMA)进行集成学习,以利用不同模型的优势,提升整体预测性能。

5.4 干预措施的引入

在模型中引入外部干预变量,如公共卫生措施的实施时间点(如疫苗接种、社交距离政策),以观察其对流感阳性率的影响,提高模型的解释性和预测准确性。

5.5 GRU模型的局限性

尽管GRU模型在时间序列预测中具有诸多优势,但其也存在一些局限性:

5.5.1 数据依赖性强

GRU模型依赖大量高质量的时间序列数据进行训练。如果数据量不足或数据质量较差,模型的预测性能可能会受到显著影响。

5.5.2 计算资源需求高

训练GRU模型尤其是深层GRU模型,可能需要较高的计算资源和较长的训练时间,特别是在处理大规模数据集时。

5.5.3 参数调优复杂

GRU模型具有多个超参数(如隐藏层大小、层数、学习率等),需要通过交叉验证和实验来确定最佳配置,过程可能耗时且复杂。

5.5.4 解释性差

深度学习模型(包括GRU)通常被视为“黑箱”,其内部决策过程难以解释。这在需要模型可解释性的应用场景中可能是一个问题。

5.5.5 过拟合风险

如果模型过于复杂或训练数据不足,GRU模型可能会过拟合训练数据,导致在未见数据上的泛化能力下降。

5.5.6 对长序列的处理有限

尽管GRU能够处理一定程度的长期依赖关系,但在处理非常长的序列时,仍可能遇到梯度消失或爆炸的问题,影响模型的性能。

赵虹锋
壮汉二米五