使用Python实现基于离散网格坐标的SEIAR个体随机模型,模拟流行病传播过程
1.一例病例与易感者接触可能会被感染,单次接触感染概率为q。设无症状感染者的传播能力是病例的κ倍,κ在0到1之间,当κ等于1时意味着无症状感染者与病例的传播能力相同,当κ等于0时意味着无症状感染者没有传染性。
2.感染病原体的个体需要一定的时间(潜伏期)后方可变为染病者,潜伏期停留时间为w1,个体的潜伏期停留时间在本模型中被设置为一个正态分布的随机数字,正态分布的均数和标准差可根据实际情况进行设定;感染病原体的个体需要一定的时间(潜隐期)后方可变为无症状感染者,潜隐期停留时间为w2,个体的潜隐期停留时间在本模型中也被设置为一个正态分布的随机数字,正态分布的均数和标准差可根据实际情况进行设定;
3.无症状感染者的比例为p,感染者的比例为1–p;
4.病例和无症状感染者的恢复时间分别为r1和r2,在本模型中也被设置为一个正态分布的随机数字,正态分布的均数和标准差可根据实际情况进行设定;
5.恢复者在研究期间内具有免疫力,不会被感染;
6.不考虑种群流动性,不考虑自然出生和死亡,不考虑病死,本模型主要用于短期暴发的模拟。
易感-暴露-感染-康复-无症状感染模型(Susceptible-Exposed-Infectious-Asymptomatic-Recovered Model,SEIAR Model)的个体随机模型(ABM)可用于描述传染病传播。该模型将个体分为易感者(S)、潜伏者(E)、感染者(I)、无症状感染者(A)和康复者(R)。在模型中,易感者可能因接触感染者而被感染,潜伏者在潜伏期内不表现症状也不能传播病毒,感染者则表现出症状并积极传播病毒,无症状感染者则在未表现症状的情况下也能传播病毒。康复者是从感染中恢复并获得免疫力的个体。通过定义个体之间的交互和状态转变,SEIAR模型能够模拟疾病在群体中的传播动态,帮助研究人员分析不同干预措施的效果。
表1 模型参数及其含义
参数 | 含义 | 取值1 | 取值2 |
---|---|---|---|
S(Susceptible) | 易感者 | 200 | 200 |
E(Exposed) | 潜伏者 | 50 | 100 |
I(Infectious) | 感染者 | 20 | 10 |
A(Asymptomatic) | 无症状感染者 | 30 | 30 |
R(Recovered) | 恢复者 | 0 | 0 |
N | 总人口数 | 300 | 340 |
w1 | 潜伏期停留时间 | 7,4 | 5,3 |
w2 | 潜隐期停留时间 | 9,6 | 7,6 |
r1 | 感染者恢复时间 | 16,6 | 14,4 |
r2 | 无症状感染者恢复时间 | 12,5 | 10,5 |
p | 无症状感染者比例 | 0.3 | 0.5 |
q | 单次接触感染概率 | 0.5 | 0.4 |
κ | 无症状感染者的相对感染率 | 0.7 | 0.5 |
time_step | 时间步长 | 0.1 | 0.2 |
total_days | 模拟总天数 | 30 | 30 |
算法:SEIAR模型的模拟
输入:个体数 N,时间步长 T
输出:每日状态人数、新增感染和新增恢复人数
1 初始化个体列表,设置每个个体的初始状态(易感、暴露、感染、无症状感染、康复);
2 初始化每日状态记录列表;
3 for 每个时间步 t 从 0 到 T do
4 for 每个个体 x do
5 更新个体 x 的位置;
6 if x 是暴露者 then
7 if x 的潜伏期已过 then
8 更新 x 的状态为感染者;
9 end
10 else if x 是无症状感染者 then
11 if x 的无症状感染期已过 then
12 更新 x 的状态为康复者;
13 end
14 end
15 记录当前状态人数;
16 end
17 计算新增感染和新增恢复人数;
18 保存每日状态记录到 CSV 文件;
19 end
20 生成折线图,基于每1/时间步长个数据点求和,根据天数进行绘图;
21 定义 unique_id,以便所有输出文件以该 ID 命名;
SEIAR 模型适合在传染病暴发情境中进行模拟,易感个体在接触感染者或无症状感染者时有可能被感染;疾病在感染后有潜伏期(潜隐期),个体在此期间不可以传播病毒;存在无症状感染者,这些个体虽然没有明显症状,但仍然能够感染其他人;感染者在经过一段时间后会恢复,恢复后通常会获得免疫力,暴发期间不再被感染;疾病传播依赖于个体之间的接触网络,接触频率和接触模式会影响传播动态。该模型可用于流感、COVID-19、麻疹等传染病的暴发。
以下是关于ABM模型的设置、散点图绘制、GIF图合成、时间记录、输出结果保存为CSV文件、折线图绘制等在内的全部函数的实现,可以在主函数中根据需要调用这些函数,此外,所有的参数和各状态的初始值也可以从主函数中设置,方便进行不同地区、不同疾病的SEIAR模型模拟。
import matplotlib.pyplot as plt
import random
import pandas as pd
import numpy as np
import imageio.v2 as imageio
import os
import time
# 定义模型
class SEIARModel:
def __new__(cls, N, disease_distribution, params):
instance = super(SEIARModel, cls).__new__(cls)
instance.num_agents = N
instance.data = []
# 初始化个体属性
instance.status = np.zeros(N, dtype=int) # 状态数组
instance.days_in_state = np.zeros(N, dtype=float)
instance.positions = np.random.randint(1, 100, size=(N, 2)) # 每个个体的坐标 [x, y]
instance.asymptomatic_infection = None
instance.w1 = None
instance.w2 = None
instance.r1 = None
instance.r2 = None
instance.p = None
instance.q = None
instance.kappa = None
instance.time_step = None
instance.previous_status = None
return instance
def initialize(self, params, disease_distribution):
self.asymptomatic_infection = np.random.rand(self.num_agents) < params['p']
self.w1 = np.clip(np.random.normal(params['w1_mean'], params['w1_std'], size=self.num_agents), 3, 12).astype(int)
self.w2 = np.clip(np.random.normal(params['w2_mean'], params['w2_std'], \
size=self.num_agents), 2, 12).astype(int)
self.r1 = np.clip(np.random.normal(params['r1_mean'],params['r1_std'],\
size=self.num_agents), 2, 24).astype(int)
self.r2 = np.clip(np.random.normal(params['r2_mean'], params['r2_std'], \
size=self.num_agents), 2, 24).astype(int)
self.p = params['p']
self.q = params['q']
self.kappa = params['kappa']
self.time_step = params['time_step']
self.previous_data = None
# 根据疾病分布向量初始化个体状态
statuses = (
[0] * disease_distribution[0] + # S
[1] * disease_distribution[1] + # E
[2] * disease_distribution[2] + # I
[3] * disease_distribution[3] + # A
[4] * disease_distribution[4] # R
)
random.shuffle(statuses) # 随机打乱状态分配
self.status[:len(statuses)] = statuses # 设置状态
def UpdateStatusAndPosition(self):
# 随机移动
self.positions += np.random.choice([-1, 0, 1], size=self.positions.shape) # 更新位置
self.positions = np.clip(self.positions, 0, 99) # 确保坐标在有效范围内
# 检查邻居状态
for i in range(self.num_agents):
if self.status[i] == 0: # 健康状态
# 计算与其他个体的接触
distances = np.sqrt((self.positions[i, 0] - self.positions[:, 0]) ** 2 + (self.positions[i, 1] - self.positions[:, 1]) ** 2)
neighbors = (distances <= 1.5) & (self.status != 0)
# 感染逻辑
if np.any(neighbors):
if np.any((self.status[neighbors] == 2) & (np.random.rand(np.sum(neighbors)) < self.q * self.time_step)):
self.status[i] = 1
if np.any((self.status[neighbors] == 3) & (np.random.rand(np.sum(neighbors)) < self.kappa * self.q * self.time_step)):
self.status[i] = 1
# 状态更新逻辑
latent_mask = (self.status == 1) # 潜伏状态
asymptomatic_mask = (self.status == 3) # 无症状感染状态
infected_mask = (self.status == 2) # 感染状态
# 更新潜伏状态
self.days_in_state[latent_mask] += self.time_step
self.status[latent_mask & (self.asymptomatic_infection & (self.days_in_state >= self.w2))] = 3 # 转为无症状感染
self.status[latent_mask & (~self.asymptomatic_infection & (self.days_in_state >= self.w1))] = 2 # 转为感染
# 更新无症状感染状态
self.days_in_state[asymptomatic_mask] += self.time_step
self.status[asymptomatic_mask & (self.days_in_state >= self.r2)] = 4 # 转为康复
self.days_in_state[asymptomatic_mask & (self.days_in_state >= self.r2)] = 0
# 更新感染状态
self.days_in_state[infected_mask] += self.time_step
self.status[infected_mask & (self.days_in_state >= self.r1)] = 4 # 转为康复
self.days_in_state[infected_mask & (self.days_in_state >= self.r1)] = 0
def record_data(self):
s_count = np.sum(self.status == 0) # 健康
e_count = np.sum(self.status == 1) # 潜伏
i_count = np.sum(self.status == 2) # 感染
a_count = np.sum(self.status == 3) # 无症状感染
r_count = np.sum(self.status == 4) # 康复
if self.previous_data is not None:
new_infections = (i_count - self.previous_data['I']) + (a_count - self.previous_data['A']) + (r_count - self.previous_data['R'])
new_recoveries = r_count - self.previous_data['R']
else:
new_infections = 0
new_recoveries = 0
# 记录当前数据
self.data.append({
"S": s_count,
"E": e_count,
"I": i_count,
"A": a_count,
"R": r_count,
"new_infections": new_infections,
"new_recoveries": new_recoveries
})
# 更新上一个数据
self.previous_data = {
'S': s_count,
'E': e_count,
'I': i_count,
'A': a_count,
'R': r_count
}
def plot_agents(model, day, output_dir, unique_id):
fig, ax = plt.subplots(figsize=(8, 8))
ax.set_xlim(0, 100)
ax.set_ylim(0, 100)
ax.set_title(f"step {day}")
# 根据状态分配颜色和标签
colors = {
0: 'blue', # 健康
1: 'blue', # 潜伏
2: 'red', # 感染
3: 'orange', # 无症状感染
4: 'green' # 康复
}
labels = {
0: 'S',
1: 'E',
2: 'I',
3: 'A',
4: 'R'
}
for status, color in colors.items():
mask = model.status == status
ax.scatter(model.positions[mask, 0], model.positions[mask, 1], c=color, label=status, alpha=0.5)
ax.legend(loc='upper left')
plt.savefig(f"{output_dir}/frame_{day}_{unique_id}.png")
plt.close(fig)
def create_gif_with_timing(model, steps, unique_id):
# 计时
start_time = time.time()
output_dir = "simulation_frames"
if not os.path.exists(output_dir):
os.makedirs(output_dir)
all_frames = []
gif_filename = f"seiar_spread_simulation_{unique_id}.gif"
for day in range(steps):
model.UpdateStatusAndPosition()
model.record_data()
plot_agents(model, day, output_dir, unique_id)
all_frames.append(imageio.imread(f"{output_dir}/frame_{day}_{unique_id}.png"))
imageio.mimsave(gif_filename, all_frames, duration=0.5)
end_time = time.time()
total_time = end_time - start_time
average_time_per_iteration = total_time / steps
print(f"Total simulation time: {total_time:.2f} seconds")
print(f"Average time per iteration: {average_time_per_iteration:.4f} seconds")
# 保存为CSV文件方便分析
def save_to_csv(model, filename):
data_to_save = []
for day in range(len(model.data)):
counts = model.data[day]
data_to_save.append({
"Day": day,
"S": counts['S'],
"E": counts['E'],
"I": counts['I'],
"A": counts['A'],
"R": counts['R'],
"new_infections": counts['new_infections'],
"new_recoveries": counts['new_recoveries']
})
df = pd.DataFrame(data_to_save)
df.to_csv(filename, index=False)
def run_simulation(model, steps, unique_id):
# 计时
start_time = time.time()
for day in range(steps):
model.UpdateStatusAndPosition()
model.record_data()
end_time = time.time()
total_time = end_time - start_time
average_time_per_iteration = total_time / steps
print(f"Total simulation time: {total_time:.2f} seconds")
print(f"Average time per iteration: {average_time_per_iteration:.4f} seconds")
# 绘制折线图
def plot_daily_status_and_infections(csv_file, params, unique_id):
# 导入数据
df = pd.read_csv(csv_file)
plt.figure(figsize=(10, 10))
# 绘制每日各状态人数
plt.subplot(2, 1, 1)
step = round(1 / params['time_step'])
plt.plot(df['Day'][::step], df['S'][::step], label='Susceptible (S)', color='blue')
plt.plot(df['Day'][::step], df['E'][::step], label='Exposed (E)', color='cyan')
plt.plot(df['Day'][::step], df['I'][::step], label='Infected (I)', color='red')
plt.plot(df['Day'][::step], df['A'][::step], label='Asymptomatic (A)', color='orange')
plt.plot(df['Day'][::step], df['R'][::step], label='Recovered (R)', color='green')
plt.title('Daily Status Counts')
plt.xlabel('Days')
plt.ylabel('Count')
plt.legend(loc='upper right')
x_ticks = df['Day'][::step] * params['time_step']
plt.xticks(ticks=df['Day'][::step], labels=(df['Day'][::step] * params['time_step']).astype(int), rotation=45)
# 绘制每日新增感染和新增恢复人数
plt.subplot(2, 1, 2)
new_infections_summed = df['new_infections'].values.reshape(-1, step).sum(axis=1)
new_recoveries_summed = df['new_recoveries'].values.reshape(-1, step).sum(axis=1)
new_days = df['Day'][::step][:len(new_infections_summed)]
plt.plot(new_days, new_infections_summed, label='New Infections', color='red')
plt.plot(new_days, new_recoveries_summed, label='New Recoveries', color='green')
plt.title('Daily New Infections and Recoveries')
plt.xlabel('Days')
plt.ylabel('Count')
plt.legend()
plt.xticks(ticks=df['Day'][::step], labels=(df['Day'][::step] * params['time_step']).astype(int), rotation=45)
plt.tight_layout()
output_file = f"seiar_spread_{unique_id}.png"
plt.savefig(output_file)
plt.show()
这是第一套参数的个体随机模型实现,为了防止多套参数模拟后运行结果被覆盖,它所有的结果都命名为编号1,用户可以通过更改以下这些参数、初始值和时间步来调整模型达到预期效果。
if __name__ == "__main__":
unique_id = 1
disease_distribution = [200, 50, 20, 30, 0] # [S, E, I, A, R]
# 集中参数设置
N = sum(disease_distribution)
params = {
'w1_mean': 7, # 潜伏期
'w1_std': 4,
'w2_mean': 9, # 潜隐期
'w2_std': 6,
'r1_mean': 16, # I的恢复期
'r1_std': 6,
'r2_mean': 12, # A的恢复期
'r2_std': 5,
'p': 0.3, # 无症状感染的概率
'q': 0.5, # 单次接触感染概率
'kappa': 0.7, # 无症状感染者的感染概率调整因子
'time_step': 0.1, # 时间步长
'total_days': 30 # 模拟的天数
}
model = SEIARModel(N, disease_distribution, params)
model.initialize(params, disease_distribution)
# 设置模型的时间步
steps = int(params['total_days'] * (1 / params['time_step']))
# 计时
run_simulation(model, steps, unique_id)
# create_gif_with_timing(model, steps, unique_id)
# 保存至 CSV并绘图
save_to_csv(model, f"daily_status_data_{unique_id}.csv")
plot_daily_status_and_infections(f"daily_status_data_{unique_id}.csv", params, unique_id)
Total simulation time是此次SEIAR模型运行的全部用时,不包括画散点图和GIF图的时间,如需要绘制散点图和动图(可用于汇报展示),可以运行计时部分的被注释代码。
Average time per iteration就是运行每个时间步的平均用时,这次运行一共30*10个迭代。
Total simulation time: 4.25 seconds
Average time per iteration: 0.0142 seconds
下图展示了每天处于各状态的人数以及每天新增的感染人数(包括有症状和无症状感染)和每天新增的恢复人数(包括有症状和无症状感染),因为个体随机模型的特点就是随机,因此折线图并不平滑,可能需要多次重复模拟求平均值会相对平滑。
图1 SEIAR模型第一套参数模拟结果
下图是模拟动图中的一帧图像,直观展示了模拟的空间和各个个体所处的位置,动图可以更清晰地展示个体间的接触和状态转换,在汇报展示和宣传时更便于观众理解。
图2 SEIAR模型第一套参数模拟散点图
这是第二套参数的个体随机模型实现,它所有的结果都命名为编号2,以下这些参数、初始值和时间步相对于上一套都有调整。
if __name__ == "__main__":
unique_id = 2
disease_distribution = [200, 100, 10, 30, 0] # [S, E, I, A, R]
# 集中参数设置
N = sum(disease_distribution)
params = {
'w1_mean': 5, # 潜伏期
'w1_std': 3,
'w2_mean': 7, # 潜隐期
'w2_std': 6,
'r1_mean': 14, # I的恢复期
'r1_std': 4,
'r2_mean': 10, # A的恢复期
'r2_std': 5,
'p': 0.5, # 无症状感染的概率
'q': 0.4, # 单次接触感染概率
'kappa': 0.5, # 无症状感染者的感染概率调整因子
'time_step': 0.2, # 时间步长
'total_days': 30 # 模拟的天数
}
model = SEIARModel(N, disease_distribution, params)
model.initialize(params, disease_distribution)
# 设置模型的时间步
steps = int(params['total_days'] * (1 / params['time_step']))
# 计时
run_simulation(model, steps, unique_id)
# create_gif_with_timing(model, steps, unique_id)
# 保存至 CSV并绘图
save_to_csv(model, f"daily_status_data_{unique_id}.csv")
plot_daily_status_and_infections(f"daily_status_data_{unique_id}.csv", params, unique_id)
Total simulation time是此次SEIAR模型运行的全部用时,如需要绘制散点图和动图(可用于汇报展示),可以运行计时部分的被注释代码。
Average time per iteration就是运行每个时间步的平均用时,这次运行一共30*5个迭代。
Total simulation time: 1.63 seconds
Average time per iteration: 0.0109 seconds
下图展示了每天处于各状态的人数以及每天新增的感染人数(包括有症状和无症状感染)和每天新增的恢复人数(包括有症状和无症状感染),因为个体随机模型的特点就是随机,因此折线图并不平滑,可能需要多次重复模拟求平均值会相对平滑。
图3 SEIAR模型第一套参数模拟结果
在 SEIAR 模型中,各主要参数对模拟结果有显著影响。易感者(S)的数量越多,疫情传播潜力越大;潜伏者(E)数量增加会导致隐蔽传播,增加控制难度;感染者(I)数量直接影响传播速度,无症状感染者(A)则增加了疫情的复杂性,A增多会导致暴发期延长;恢复者(R)的数量影响群体免疫的形成,R增多会结束暴发;总人口数(N)则决定了传播范围。
潜伏期(w1)和潜隐期(w2)越长,识别感染者的时间越晚,传播风险增加;感染者和无症状感染者的恢复时间(r1 和 r2)越长,传播时间也越长。
无症状感染者比例(p)和单次接触感染概率(q)越高,疫情传播风险越大。时间步长(time_step)影响模拟精度。
以第一套参数为例,个体每天平均接触别的个体的次数c为2.4次,病程D在模型中为r1和r2,按照公式R0=q×c×D,q=R0/(c×D)已知R0可以求得单次接触感染概率q。
c=8×10×300/(100×100)=2.4