前馈神经网络并用Bootstrap增加置信区间

使用Python实现FNN模型,拟合流行病发病人数


日期

FNN模型

1 概述

1.1 模型假设

1.2 模型描述

前馈神经网络(Feedforward Neural Network, FNN)是一种经典的神经网络模型,通过多个神经元层(通常包括输入层、隐藏层和输出层)实现输入到输出的映射。FNN不具有循环结构,适用于较短期的时间序列预测任务,或在合适的特征工程下对疾病传播等问题进行建模。

1.3 模型参数及含义

参数 含义
input_size 输入特征的维度
hidden_size 隐藏层的神经元数量
output_size 输出的维度
num_layers 隐藏层的数量
sequence_length 输入序列的长度(时间步数)
batch_size 每批训练的数据样本数量
learning_rate 优化器的学习率
epochs 每次引导法训练的轮数
n_bootstrap 引导法的迭代次数

2 模型流程图及方程

2.1 模型流程图

FNN模型流程图

2.2 FNN模型方程

FNN的核心是前馈计算,计算过程如下:

隐层输出

$$ h^{(l)} = f(W^{(l)} h^{(l-1)} + b^{(l)}) $$ 其中,$( f )$ 是激活函数,$( h^{(l)} )$ 为第 $( l )$ 层的输出,$( W^{(l)} )$ 为该层的权重矩阵,$( b^{(l)} )$ 为该层的偏置向量。

输出层

$$ y = W^{(output)} h^{(L)} + b^{(output)} $$ 其中,$( y )$ 为最终输出,$( L )$ 是总的层数。


3 适用疾病流行条件

FNN模型适用于以下疾病流行条件的预测:

短期预测:适用于预测较短时间范围内的疾病传播趋势。 特征工程充分:适合通过特征提取转换成合适的输入向量的情况,如将时间序列的各类统计特征作为输入。 非时间依赖性:对于没有明显时间依赖性、或者在时间维度上没有长期依赖关系的流行病,FNN是一个合适的选择。


4 代码(Python)

4.1 条件设定

在开始建模之前,需要配置环境,导入必要的库。

import torch
import torch.nn as nn
import pandas as pd
import numpy as np
import matplotlib
import matplotlib.pyplot as plt
from sklearn.preprocessing import StandardScaler
from sklearn.model_selection import ParameterGrid
from sklearn.metrics import mean_squared_error
from statsmodels.graphics.tsaplots import plot_acf
from scipy.stats import probplot
import seaborn as sns
from scipy.stats import shapiro
from sklearn.metrics import mean_absolute_error
from scipy.stats import skew, kurtosis
from statsmodels.tsa.stattools import adfuller, acf, pacf


# 设置中文字体
matplotlib.rcParams['font.sans-serif'] = ['Microsoft YaHei']  # Windows下的常见中文字体
matplotlib.rcParams['axes.unicode_minus'] = False  # 正常显示负号

4.2 数据预处理

from statsmodels.tsa.stattools import adfuller

# 数据预处理
data = pd.read_excel('./daily_flu_counts.xlsx')
# ADF检验
def adf_test(series):
    result = adfuller(series)
    print('ADF Statistic:', result[0])
    print('p-value:', result[1])
    if result[1] < 0.05:
        print("序列是平稳的")
    else:
        print("序列是非平稳的")

# 检查发病数的平稳性
adf_test(data['发病数'])


# 使用线性插值填充缺失值
data['发病数'] = data['发病数'].interpolate(method='linear')
# 确保数据非负
data['发病数'] = data['发病数'].clip(lower=0)

# 确保 '日期' 列为 datetime 类型
data['日期'] = pd.to_datetime(data['日期'])


# 绘制原数据的曲线图
plt.figure(figsize=(12, 6))
plt.plot(data['日期'], data['发病数'], label='发病数', color='blue')


plt.title('每日发病数曲线图')
plt.xlabel('日期')
plt.ylabel('发病数')
plt.legend()
plt.grid(True)
plt.xticks(rotation=45)
plt.show()

4.3 模型训练

import torch
import torch.nn as nn
import pandas as pd
import numpy as np
import matplotlib.pyplot as plt
from sklearn.preprocessing import StandardScaler
from sklearn.model_selection import ParameterGrid
from sklearn.metrics import mean_squared_error
import matplotlib.pyplot as plt
import matplotlib

# 设置中文字体
matplotlib.rcParams['font.sans-serif'] = ['Microsoft YaHei']  # Windows下的常见中文字体
matplotlib.rcParams['axes.unicode_minus'] = False  # 正常显示负号

# 检查是否有可用的 GPU
device = torch.device('cuda:0' if torch.cuda.is_available() else 'cpu')
print(f'Using device: {device}')

# 读取数据
data = pd.read_excel('./daily_flu_counts.xlsx')
data = data[['日期', '发病数']].sort_values('日期').fillna(0)


# 标准化数据
scaler = StandardScaler()
scaled_data = scaler.fit_transform(data['发病数'].values.reshape(-1, 1))

# 创建时间序列数据集
def create_dataset(dataset, look_back=1):
    X, Y = [], []
    for i in range(len(dataset) - look_back):
        X.append(dataset[i:(i + look_back), 0])
        Y.append(dataset[i + look_back, 0])
    return np.array(X), np.array(Y)

look_back = 14  # 滑动窗口长度
X, Y = create_dataset(scaled_data, look_back)

# 拆分训练集和测试集
train_size = int(len(X) * 0.8)
X_train, X_test = X[:train_size], X[train_size:]
Y_train, Y_test = Y[:train_size], Y[train_size:]

# 转换为 PyTorch 张量
X_train = torch.tensor(X_train, dtype=torch.float32).to(device)
X_test = torch.tensor(X_test, dtype=torch.float32).to(device)
Y_train = torch.tensor(Y_train, dtype=torch.float32).view(-1, 1).to(device)
Y_test = torch.tensor(Y_test, dtype=torch.float32).view(-1, 1).to(device)

# 改进后的 FNN 模型
class EnhancedFNNModel(nn.Module):
    def __init__(self, input_size=1, hidden_size=64, output_size=1):
        super(EnhancedFNNModel, self).__init__()
        self.fc1 = nn.Linear(input_size * look_back, hidden_size)  # 输入层
        self.relu = nn.ReLU()
        self.fc2 = nn.Linear(hidden_size, 64)  # 隐藏层
        self.fc3 = nn.Linear(64, output_size)  # 输出层

    def forward(self, x):
        x = x.view(x.size(0), -1)  # 展平输入
        x = self.fc1(x)
        x = self.relu(x)
        x = self.fc2(x)
        x = self.relu(x)
        x = self.fc3(x)
        return x


# 超参数搜索的网格
param_grid = {
    'hidden_size': [64, 128, 256],
    'learning_rate': [0.0001, 0.0005, 0.001],
    'batch_size': [16, 32, 64]
}

# 超参数搜索与模型训练评估函数
def evaluate_and_train(hidden_size, learning_rate, batch_size):
    # 初始化模型
    model = EnhancedFNNModel(input_size=1, hidden_size=hidden_size).to(device)
    loss_function = nn.MSELoss()
    optimizer = torch.optim.Adam(model.parameters(), lr=learning_rate)

    # 训练模型
    epochs = 100
    for epoch in range(epochs):
        model.train()
        optimizer.zero_grad()
        y_pred = model(X_train)
        loss = loss_function(y_pred, Y_train)
        loss.backward()
        optimizer.step()

    # 预测和计算测试集的MSE
    model.eval()
    with torch.no_grad():
        test_predictions = model(X_test).cpu().numpy()
    test_predictions_scaled = scaler.inverse_transform(test_predictions)
    Y_test_scaled = scaler.inverse_transform(Y_test.cpu().numpy())
    mse = mean_squared_error(Y_test_scaled, test_predictions_scaled)
    
    return mse, model  # 返回MSE和模型实例

# 网格搜索
best_params = None
best_mse = float('inf')
best_model = None  # 用于保存最佳模型实例

for params in ParameterGrid(param_grid):
    print(f'Evaluating with parameters: {params}')
    mse, trained_model = evaluate_and_train(**params)
    print(f'MSE: {mse}')
    
    if mse < best_mse:
        best_mse = mse
        best_params = params
        best_model = trained_model  # 保存当前最佳模型实例

print(f'Best parameters: {best_params}')
print(f'Best MSE: {best_mse}')

# 使用最佳参数的模型进行完整训练
print("Training the best model with full settings...")
best_hidden_size = best_params['hidden_size']
best_learning_rate = best_params['learning_rate']
best_batch_size = best_params['batch_size']

# 使用最佳模型实例继续训练
loss_function = nn.MSELoss()
optimizer = torch.optim.Adam(best_model.parameters(), lr=best_learning_rate)

# 记录训练损失
train_losses = []
epochs = 500
for epoch in range(epochs):
    best_model.train()
    optimizer.zero_grad()
    y_pred = best_model(X_train)
    loss = loss_function(y_pred, Y_train)
    loss.backward()
    optimizer.step()

    train_losses.append(loss.item())
    if epoch % 50 == 0:
        print(f'Epoch {epoch}, Loss: {loss.item()}')

# 绘制训练损失
plt.plot(train_losses, label="训练损失")
plt.xlabel("Epochs")
plt.ylabel("Loss")
plt.legend()
plt.show()

# 测试集预测
best_model.eval()
with torch.no_grad():
    test_predictions = best_model(X_test).cpu().numpy()

# 逆归一化
test_predictions_scaled = scaler.inverse_transform(test_predictions)
Y_test_scaled = scaler.inverse_transform(Y_test.cpu().numpy())

# 确保预测值不小于0
test_predictions_scaled = np.clip(test_predictions_scaled, 0, None)

# 计算残差并中心化
residuals = Y_test_scaled.flatten() - test_predictions_scaled.flatten()
residuals_centered = residuals - residuals.mean()

# Bootstrap参数
n_bootstrap = 1000  # 重采样次数
bootstrap_predictions = []

np.random.seed(42)  # 为了结果可复现

for _ in range(n_bootstrap):
    # 从中心化的残差中有放回地采样
    sampled_residuals = np.random.choice(residuals_centered, size=len(residuals_centered), replace=True)
    # 生成新的预测样本
    bootstrap_pred = test_predictions_scaled.flatten() + sampled_residuals
    # 确保预测值不小于0
    bootstrap_pred = np.clip(bootstrap_pred, 0, None)
    bootstrap_predictions.append(bootstrap_pred)

bootstrap_predictions = np.array(bootstrap_predictions)

# 计算置信区间(例如 95%)
lower_percentile = 2.5
upper_percentile = 97.5
ci_lower = np.percentile(bootstrap_predictions, lower_percentile, axis=0)
ci_upper = np.percentile(bootstrap_predictions, upper_percentile, axis=0)


# 预测未来的步骤数(例如 30 天)
future_steps = 30
future_predictions = []
last_sequence = scaled_data[-look_back:].reshape(1, look_back)

best_model.eval()
with torch.no_grad():
    for _ in range(future_steps):
        pred_scaled = best_model(torch.tensor(last_sequence, dtype=torch.float32).to(device)).cpu().numpy()
        future_predictions.append(pred_scaled.flatten()[0])
        # 更新序列:移除第一个元素,添加新的预测
        new_sequence = np.append(last_sequence.flatten()[1:], pred_scaled)
        last_sequence = new_sequence.reshape(1, look_back)

# 逆归一化
future_predictions_scaled = scaler.inverse_transform(np.array(future_predictions).reshape(-1, 1)).flatten()
future_predictions_scaled = np.clip(future_predictions_scaled, 0, None)

# Bootstrap 未来预测置信区间
bootstrap_future_predictions = []

for _ in range(n_bootstrap):
    # 假设未来预测的残差分布与测试集相似
    sampled_residuals = np.random.choice(residuals_centered, size=future_steps, replace=True)
    bootstrap_future_pred = future_predictions_scaled + sampled_residuals
    # 确保预测值不小于0
    bootstrap_future_pred = np.clip(bootstrap_future_pred, 0, None)
    bootstrap_future_predictions.append(bootstrap_future_pred)

bootstrap_future_predictions = np.array(bootstrap_future_predictions)

# 计算置信区间(例如 95%)
ci_lower_future = np.percentile(bootstrap_future_predictions, lower_percentile, axis=0)
ci_upper_future = np.percentile(bootstrap_future_predictions, upper_percentile, axis=0)

# 生成未来的日期
last_date = data['日期'].iloc[-1]

# 确保 last_date 是 pandas Timestamp 对象
if not isinstance(last_date, pd.Timestamp):
    last_date = pd.to_datetime(last_date)

# 使用 start 和 periods 生成未来的日期
future_dates = pd.date_range(start=last_date + pd.Timedelta(days=1), periods=future_steps)



4.4 结果制图


# 可视化训练集曲线、测试集预测与置信区间、以及未来预测与置信区间
plt.figure(figsize=(14, 7))

# 绘制训练集曲线
plt.plot(data['日期'][:train_size + look_back], scaler.inverse_transform(scaled_data[:train_size + look_back]), label='训练集', color='gray', linestyle='--')

# 绘制测试集预测值和实际值
plt.plot(data['日期'].iloc[train_size + look_back:], Y_test_scaled, label='实际值', color='blue')
plt.plot(data['日期'].iloc[train_size + look_back:], test_predictions_scaled, label='预测值', color='red')
plt.fill_between(data['日期'].iloc[train_size + look_back:], ci_lower, ci_upper, color='pink', alpha=0.5, label='95% 置信区间')

# 绘制未来预测值和置信区间
plt.plot(future_dates, future_predictions_scaled, label='未来预测值', color='green')
plt.fill_between(future_dates, ci_lower_future, ci_upper_future, color='lightgreen', alpha=0.5, label='95% 置信区间')

# 添加图例、标题和标签
plt.xlabel('日期')
plt.ylabel('发病数')
plt.title('某病LSTM预测并用bootstrap增加置信区间')
plt.legend()

# 显示图像
plt.show()

4.5 模型评价指标与残差分析可视化


# 计算RMSE
rmse = np.sqrt(mean_squared_error(Y_test_scaled, test_predictions_scaled))
print(f'RMSE: {rmse}')

# 计算MAE
mae = mean_absolute_error(Y_test_scaled, test_predictions_scaled)
print(f'MAE: {mae}')

# 计算MAPE
mape = np.mean(np.abs((Y_test_scaled - test_predictions_scaled) / Y_test_scaled)) * 100
print(f'MAPE: {mape}%')

# 计算SMAPE
smape = 100 * np.mean(2 * np.abs(Y_test_scaled - test_predictions_scaled) / (np.abs(Y_test_scaled) + np.abs(test_predictions_scaled)))
print(f'SMAPE: {smape}%')


# 绘制残差的直方图
plt.figure(figsize=(10, 6))
sns.histplot(residuals, kde=True, bins=30, color='blue', stat='density')
plt.title('Residuals Distribution')
plt.xlabel('Residuals')
plt.ylabel('Density')
plt.show()

# 进行 Shapiro-Wilk 正态性检验
stat, p_value = shapiro(residuals)
print(f'Shapiro-Wilk Test: Stat={stat}, p-value={p_value}')

from statsmodels.graphics.tsaplots import plot_acf

# 绘制残差的自相关函数(ACF)
plt.figure(figsize=(10, 6))
plot_acf(residuals, lags=30)  # 选择lags的数量,根据需要调整
plt.title('ACF of Residuals')
plt.show()

# 绘制 Q-Q 图

plt.figure(figsize=(6, 6))
probplot(residuals_centered, dist="norm", plot=plt)
plt.title("Q-Q 图")
plt.show()


5 结果

5.1数据预处理

平稳性检验

ADF Statistic: -3.1569840080570475
p-value: 0.022613583113120447
序列是平稳的

数据预处理

5.2 超参数网格搜索与训练损失

Best parameters: {'batch_size': 16, 'hidden_size': 64, 'learning_rate': 0.001}
Training the best model with full settings...
Epoch 0, Loss: 0.019518502056598663
Epoch 50, Loss: 0.014199284836649895
Epoch 100, Loss: 0.01099932286888361
Epoch 150, Loss: 0.007938825525343418
Epoch 200, Loss: 0.00576759735122323
Epoch 250, Loss: 0.004306826274842024
Epoch 300, Loss: 0.0032198289409279823
Epoch 350, Loss: 0.0024303440004587173
Epoch 400, Loss: 0.001890824525617063
Epoch 450, Loss: 0.0015683139208704233


训练损失

Best parameters: {‘batch_size’: 64, ‘hidden_size’: 64, ‘learning_rate’: 0.0005}

5.3 模型预测结果

拟合预测图

5.4 模型评价指标与残差分析结果

5.4.1 模型评价指标

模型评价指标

RMSE: 67.94408416748047
MAE: 55.37308120727539
MAPE: 12.321919947862625%
SMAPE: 13.267050683498383%

RMSE 和 MAE 都较高,这可能表明模型的整体预测能力还有待提高,本例数据本身的值较大且波动较大故仍在接受范围。 MAPE 和 SMAPE 表示中等的预测精度,12.32%和13.27%的值表明模型的相对准确性可以接受,但仍有提升空间。

5.4.2 绘制残差的直方图与正态性检验

残差直方图

Shapiro-Wilk Test: Stat=0.9481209516525269, p-value=0.20621141418801466

绘制残差的直方图,并进行正态性检验(如 Shapiro-Wilk 测试)。如果残差分布近似于正态分布,表示模型可能已经捕捉到数据的所有信息。如果 p-value 很小(通常小于 0.05),说明残差不是正态分布,可能需要改进模型。本例p-value=0.206>0.05。

5.4.3 自相关函数 (ACF) 图

ACF

残差的自相关性可以通过计算自相关函数(ACF)来检验。如果残差没有自相关性,那么可以认为模型的误差是随机的,不含有可预测的信息。

5.4.4 Q-Q 图

Q-Q图

Q-Q图进一步验证了残差的正态性。大部分点紧密分布在参考线附近,表明残差与正态分布吻合良好。这说明模型的预测误差符合正态分布的假设,有助于后续的统计分析和置信区间的构建。

5.5 讨论与改进

5.1.2 正则化技术

5.1.3 早停法(Early Stopping)

在训练过程中监控验证集的性能,当验证误差不再下降时,提前停止训练,防止模型过拟合。

5.1.4 引入额外特征

可以引入其他相关指标,如气象因素(温度、湿度等)、社会因素(人口密度、经济水平等)等,以提供更多信息,提升模型的预测能力。

5.1.5 模型局限性

①无法捕捉时序依赖:FNN 是静态的,每次输入都被独立地处理,无法自然地捕捉数据中的时间依赖性(如时序数据中的趋势和季节性)。 ②过拟合风险:如果没有适当的正则化(如 dropout 或 L2 正则化),FNN 容易发生过拟合,特别是数据量较少时。 ③特征选择困难:FNN 对特征选择比较敏感,可能需要较多的前期特征工程工作,以确保网络能学习到有效的模式。

胡子欣
拉夫克劳保安