使用Python实现FNN模型,拟合流行病发病人数
前馈神经网络(Feedforward Neural Network, FNN)是一种经典的神经网络模型,通过多个神经元层(通常包括输入层、隐藏层和输出层)实现输入到输出的映射。FNN不具有循环结构,适用于较短期的时间序列预测任务,或在合适的特征工程下对疾病传播等问题进行建模。
参数 | 含义 |
---|---|
input_size | 输入特征的维度 |
hidden_size | 隐藏层的神经元数量 |
output_size | 输出的维度 |
num_layers | 隐藏层的数量 |
sequence_length | 输入序列的长度(时间步数) |
batch_size | 每批训练的数据样本数量 |
learning_rate | 优化器的学习率 |
epochs | 每次引导法训练的轮数 |
n_bootstrap | 引导法的迭代次数 |
FNN的核心是前馈计算,计算过程如下:
$$ h^{(l)} = f(W^{(l)} h^{(l-1)} + b^{(l)}) $$ 其中,$( f )$ 是激活函数,$( h^{(l)} )$ 为第 $( l )$ 层的输出,$( W^{(l)} )$ 为该层的权重矩阵,$( b^{(l)} )$ 为该层的偏置向量。
$$ y = W^{(output)} h^{(L)} + b^{(output)} $$ 其中,$( y )$ 为最终输出,$( L )$ 是总的层数。
FNN模型适用于以下疾病流行条件的预测:
短期预测:适用于预测较短时间范围内的疾病传播趋势。 特征工程充分:适合通过特征提取转换成合适的输入向量的情况,如将时间序列的各类统计特征作为输入。 非时间依赖性:对于没有明显时间依赖性、或者在时间维度上没有长期依赖关系的流行病,FNN是一个合适的选择。
在开始建模之前,需要配置环境,导入必要的库。
import torch
import torch.nn as nn
import pandas as pd
import numpy as np
import matplotlib
import matplotlib.pyplot as plt
from sklearn.preprocessing import StandardScaler
from sklearn.model_selection import ParameterGrid
from sklearn.metrics import mean_squared_error
from statsmodels.graphics.tsaplots import plot_acf
from scipy.stats import probplot
import seaborn as sns
from scipy.stats import shapiro
from sklearn.metrics import mean_absolute_error
from scipy.stats import skew, kurtosis
from statsmodels.tsa.stattools import adfuller, acf, pacf
# 设置中文字体
matplotlib.rcParams['font.sans-serif'] = ['Microsoft YaHei'] # Windows下的常见中文字体
matplotlib.rcParams['axes.unicode_minus'] = False # 正常显示负号
from statsmodels.tsa.stattools import adfuller
# 数据预处理
data = pd.read_excel('./daily_flu_counts.xlsx')
# ADF检验
def adf_test(series):
result = adfuller(series)
print('ADF Statistic:', result[0])
print('p-value:', result[1])
if result[1] < 0.05:
print("序列是平稳的")
else:
print("序列是非平稳的")
# 检查发病数的平稳性
adf_test(data['发病数'])
# 使用线性插值填充缺失值
data['发病数'] = data['发病数'].interpolate(method='linear')
# 确保数据非负
data['发病数'] = data['发病数'].clip(lower=0)
# 确保 '日期' 列为 datetime 类型
data['日期'] = pd.to_datetime(data['日期'])
# 绘制原数据的曲线图
plt.figure(figsize=(12, 6))
plt.plot(data['日期'], data['发病数'], label='发病数', color='blue')
plt.title('每日发病数曲线图')
plt.xlabel('日期')
plt.ylabel('发病数')
plt.legend()
plt.grid(True)
plt.xticks(rotation=45)
plt.show()
import torch
import torch.nn as nn
import pandas as pd
import numpy as np
import matplotlib.pyplot as plt
from sklearn.preprocessing import StandardScaler
from sklearn.model_selection import ParameterGrid
from sklearn.metrics import mean_squared_error
import matplotlib.pyplot as plt
import matplotlib
# 设置中文字体
matplotlib.rcParams['font.sans-serif'] = ['Microsoft YaHei'] # Windows下的常见中文字体
matplotlib.rcParams['axes.unicode_minus'] = False # 正常显示负号
# 检查是否有可用的 GPU
device = torch.device('cuda:0' if torch.cuda.is_available() else 'cpu')
print(f'Using device: {device}')
# 读取数据
data = pd.read_excel('./daily_flu_counts.xlsx')
data = data[['日期', '发病数']].sort_values('日期').fillna(0)
# 标准化数据
scaler = StandardScaler()
scaled_data = scaler.fit_transform(data['发病数'].values.reshape(-1, 1))
# 创建时间序列数据集
def create_dataset(dataset, look_back=1):
X, Y = [], []
for i in range(len(dataset) - look_back):
X.append(dataset[i:(i + look_back), 0])
Y.append(dataset[i + look_back, 0])
return np.array(X), np.array(Y)
look_back = 14 # 滑动窗口长度
X, Y = create_dataset(scaled_data, look_back)
# 拆分训练集和测试集
train_size = int(len(X) * 0.8)
X_train, X_test = X[:train_size], X[train_size:]
Y_train, Y_test = Y[:train_size], Y[train_size:]
# 转换为 PyTorch 张量
X_train = torch.tensor(X_train, dtype=torch.float32).to(device)
X_test = torch.tensor(X_test, dtype=torch.float32).to(device)
Y_train = torch.tensor(Y_train, dtype=torch.float32).view(-1, 1).to(device)
Y_test = torch.tensor(Y_test, dtype=torch.float32).view(-1, 1).to(device)
# 改进后的 FNN 模型
class EnhancedFNNModel(nn.Module):
def __init__(self, input_size=1, hidden_size=64, output_size=1):
super(EnhancedFNNModel, self).__init__()
self.fc1 = nn.Linear(input_size * look_back, hidden_size) # 输入层
self.relu = nn.ReLU()
self.fc2 = nn.Linear(hidden_size, 64) # 隐藏层
self.fc3 = nn.Linear(64, output_size) # 输出层
def forward(self, x):
x = x.view(x.size(0), -1) # 展平输入
x = self.fc1(x)
x = self.relu(x)
x = self.fc2(x)
x = self.relu(x)
x = self.fc3(x)
return x
# 超参数搜索的网格
param_grid = {
'hidden_size': [64, 128, 256],
'learning_rate': [0.0001, 0.0005, 0.001],
'batch_size': [16, 32, 64]
}
# 超参数搜索与模型训练评估函数
def evaluate_and_train(hidden_size, learning_rate, batch_size):
# 初始化模型
model = EnhancedFNNModel(input_size=1, hidden_size=hidden_size).to(device)
loss_function = nn.MSELoss()
optimizer = torch.optim.Adam(model.parameters(), lr=learning_rate)
# 训练模型
epochs = 100
for epoch in range(epochs):
model.train()
optimizer.zero_grad()
y_pred = model(X_train)
loss = loss_function(y_pred, Y_train)
loss.backward()
optimizer.step()
# 预测和计算测试集的MSE
model.eval()
with torch.no_grad():
test_predictions = model(X_test).cpu().numpy()
test_predictions_scaled = scaler.inverse_transform(test_predictions)
Y_test_scaled = scaler.inverse_transform(Y_test.cpu().numpy())
mse = mean_squared_error(Y_test_scaled, test_predictions_scaled)
return mse, model # 返回MSE和模型实例
# 网格搜索
best_params = None
best_mse = float('inf')
best_model = None # 用于保存最佳模型实例
for params in ParameterGrid(param_grid):
print(f'Evaluating with parameters: {params}')
mse, trained_model = evaluate_and_train(**params)
print(f'MSE: {mse}')
if mse < best_mse:
best_mse = mse
best_params = params
best_model = trained_model # 保存当前最佳模型实例
print(f'Best parameters: {best_params}')
print(f'Best MSE: {best_mse}')
# 使用最佳参数的模型进行完整训练
print("Training the best model with full settings...")
best_hidden_size = best_params['hidden_size']
best_learning_rate = best_params['learning_rate']
best_batch_size = best_params['batch_size']
# 使用最佳模型实例继续训练
loss_function = nn.MSELoss()
optimizer = torch.optim.Adam(best_model.parameters(), lr=best_learning_rate)
# 记录训练损失
train_losses = []
epochs = 500
for epoch in range(epochs):
best_model.train()
optimizer.zero_grad()
y_pred = best_model(X_train)
loss = loss_function(y_pred, Y_train)
loss.backward()
optimizer.step()
train_losses.append(loss.item())
if epoch % 50 == 0:
print(f'Epoch {epoch}, Loss: {loss.item()}')
# 绘制训练损失
plt.plot(train_losses, label="训练损失")
plt.xlabel("Epochs")
plt.ylabel("Loss")
plt.legend()
plt.show()
# 测试集预测
best_model.eval()
with torch.no_grad():
test_predictions = best_model(X_test).cpu().numpy()
# 逆归一化
test_predictions_scaled = scaler.inverse_transform(test_predictions)
Y_test_scaled = scaler.inverse_transform(Y_test.cpu().numpy())
# 确保预测值不小于0
test_predictions_scaled = np.clip(test_predictions_scaled, 0, None)
# 计算残差并中心化
residuals = Y_test_scaled.flatten() - test_predictions_scaled.flatten()
residuals_centered = residuals - residuals.mean()
# Bootstrap参数
n_bootstrap = 1000 # 重采样次数
bootstrap_predictions = []
np.random.seed(42) # 为了结果可复现
for _ in range(n_bootstrap):
# 从中心化的残差中有放回地采样
sampled_residuals = np.random.choice(residuals_centered, size=len(residuals_centered), replace=True)
# 生成新的预测样本
bootstrap_pred = test_predictions_scaled.flatten() + sampled_residuals
# 确保预测值不小于0
bootstrap_pred = np.clip(bootstrap_pred, 0, None)
bootstrap_predictions.append(bootstrap_pred)
bootstrap_predictions = np.array(bootstrap_predictions)
# 计算置信区间(例如 95%)
lower_percentile = 2.5
upper_percentile = 97.5
ci_lower = np.percentile(bootstrap_predictions, lower_percentile, axis=0)
ci_upper = np.percentile(bootstrap_predictions, upper_percentile, axis=0)
# 预测未来的步骤数(例如 30 天)
future_steps = 30
future_predictions = []
last_sequence = scaled_data[-look_back:].reshape(1, look_back)
best_model.eval()
with torch.no_grad():
for _ in range(future_steps):
pred_scaled = best_model(torch.tensor(last_sequence, dtype=torch.float32).to(device)).cpu().numpy()
future_predictions.append(pred_scaled.flatten()[0])
# 更新序列:移除第一个元素,添加新的预测
new_sequence = np.append(last_sequence.flatten()[1:], pred_scaled)
last_sequence = new_sequence.reshape(1, look_back)
# 逆归一化
future_predictions_scaled = scaler.inverse_transform(np.array(future_predictions).reshape(-1, 1)).flatten()
future_predictions_scaled = np.clip(future_predictions_scaled, 0, None)
# Bootstrap 未来预测置信区间
bootstrap_future_predictions = []
for _ in range(n_bootstrap):
# 假设未来预测的残差分布与测试集相似
sampled_residuals = np.random.choice(residuals_centered, size=future_steps, replace=True)
bootstrap_future_pred = future_predictions_scaled + sampled_residuals
# 确保预测值不小于0
bootstrap_future_pred = np.clip(bootstrap_future_pred, 0, None)
bootstrap_future_predictions.append(bootstrap_future_pred)
bootstrap_future_predictions = np.array(bootstrap_future_predictions)
# 计算置信区间(例如 95%)
ci_lower_future = np.percentile(bootstrap_future_predictions, lower_percentile, axis=0)
ci_upper_future = np.percentile(bootstrap_future_predictions, upper_percentile, axis=0)
# 生成未来的日期
last_date = data['日期'].iloc[-1]
# 确保 last_date 是 pandas Timestamp 对象
if not isinstance(last_date, pd.Timestamp):
last_date = pd.to_datetime(last_date)
# 使用 start 和 periods 生成未来的日期
future_dates = pd.date_range(start=last_date + pd.Timedelta(days=1), periods=future_steps)
# 可视化训练集曲线、测试集预测与置信区间、以及未来预测与置信区间
plt.figure(figsize=(14, 7))
# 绘制训练集曲线
plt.plot(data['日期'][:train_size + look_back], scaler.inverse_transform(scaled_data[:train_size + look_back]), label='训练集', color='gray', linestyle='--')
# 绘制测试集预测值和实际值
plt.plot(data['日期'].iloc[train_size + look_back:], Y_test_scaled, label='实际值', color='blue')
plt.plot(data['日期'].iloc[train_size + look_back:], test_predictions_scaled, label='预测值', color='red')
plt.fill_between(data['日期'].iloc[train_size + look_back:], ci_lower, ci_upper, color='pink', alpha=0.5, label='95% 置信区间')
# 绘制未来预测值和置信区间
plt.plot(future_dates, future_predictions_scaled, label='未来预测值', color='green')
plt.fill_between(future_dates, ci_lower_future, ci_upper_future, color='lightgreen', alpha=0.5, label='95% 置信区间')
# 添加图例、标题和标签
plt.xlabel('日期')
plt.ylabel('发病数')
plt.title('某病LSTM预测并用bootstrap增加置信区间')
plt.legend()
# 显示图像
plt.show()
# 计算RMSE
rmse = np.sqrt(mean_squared_error(Y_test_scaled, test_predictions_scaled))
print(f'RMSE: {rmse}')
# 计算MAE
mae = mean_absolute_error(Y_test_scaled, test_predictions_scaled)
print(f'MAE: {mae}')
# 计算MAPE
mape = np.mean(np.abs((Y_test_scaled - test_predictions_scaled) / Y_test_scaled)) * 100
print(f'MAPE: {mape}%')
# 计算SMAPE
smape = 100 * np.mean(2 * np.abs(Y_test_scaled - test_predictions_scaled) / (np.abs(Y_test_scaled) + np.abs(test_predictions_scaled)))
print(f'SMAPE: {smape}%')
# 绘制残差的直方图
plt.figure(figsize=(10, 6))
sns.histplot(residuals, kde=True, bins=30, color='blue', stat='density')
plt.title('Residuals Distribution')
plt.xlabel('Residuals')
plt.ylabel('Density')
plt.show()
# 进行 Shapiro-Wilk 正态性检验
stat, p_value = shapiro(residuals)
print(f'Shapiro-Wilk Test: Stat={stat}, p-value={p_value}')
from statsmodels.graphics.tsaplots import plot_acf
# 绘制残差的自相关函数(ACF)
plt.figure(figsize=(10, 6))
plot_acf(residuals, lags=30) # 选择lags的数量,根据需要调整
plt.title('ACF of Residuals')
plt.show()
# 绘制 Q-Q 图
plt.figure(figsize=(6, 6))
probplot(residuals_centered, dist="norm", plot=plt)
plt.title("Q-Q 图")
plt.show()
平稳性检验
ADF Statistic: -3.1569840080570475
p-value: 0.022613583113120447
序列是平稳的
Best parameters: {'batch_size': 16, 'hidden_size': 64, 'learning_rate': 0.001}
Training the best model with full settings...
Epoch 0, Loss: 0.019518502056598663
Epoch 50, Loss: 0.014199284836649895
Epoch 100, Loss: 0.01099932286888361
Epoch 150, Loss: 0.007938825525343418
Epoch 200, Loss: 0.00576759735122323
Epoch 250, Loss: 0.004306826274842024
Epoch 300, Loss: 0.0032198289409279823
Epoch 350, Loss: 0.0024303440004587173
Epoch 400, Loss: 0.001890824525617063
Epoch 450, Loss: 0.0015683139208704233
Best parameters: {‘batch_size’: 64, ‘hidden_size’: 64, ‘learning_rate’: 0.0005}
模型评价指标
RMSE: 67.94408416748047
MAE: 55.37308120727539
MAPE: 12.321919947862625%
SMAPE: 13.267050683498383%
RMSE 和 MAE 都较高,这可能表明模型的整体预测能力还有待提高,本例数据本身的值较大且波动较大故仍在接受范围。 MAPE 和 SMAPE 表示中等的预测精度,12.32%和13.27%的值表明模型的相对准确性可以接受,但仍有提升空间。
Shapiro-Wilk Test: Stat=0.9481209516525269, p-value=0.20621141418801466
绘制残差的直方图,并进行正态性检验(如 Shapiro-Wilk 测试)。如果残差分布近似于正态分布,表示模型可能已经捕捉到数据的所有信息。如果 p-value 很小(通常小于 0.05),说明残差不是正态分布,可能需要改进模型。本例p-value=0.206>0.05。
残差的自相关性可以通过计算自相关函数(ACF)来检验。如果残差没有自相关性,那么可以认为模型的误差是随机的,不含有可预测的信息。
Q-Q图进一步验证了残差的正态性。大部分点紧密分布在参考线附近,表明残差与正态分布吻合良好。这说明模型的预测误差符合正态分布的假设,有助于后续的统计分析和置信区间的构建。
在训练过程中监控验证集的性能,当验证误差不再下降时,提前停止训练,防止模型过拟合。
可以引入其他相关指标,如气象因素(温度、湿度等)、社会因素(人口密度、经济水平等)等,以提供更多信息,提升模型的预测能力。
①无法捕捉时序依赖:FNN 是静态的,每次输入都被独立地处理,无法自然地捕捉数据中的时间依赖性(如时序数据中的趋势和季节性)。 ②过拟合风险:如果没有适当的正则化(如 dropout 或 L2 正则化),FNN 容易发生过拟合,特别是数据量较少时。 ③特征选择困难:FNN 对特征选择比较敏感,可能需要较多的前期特征工程工作,以确保网络能学习到有效的模式。