使用Python实现基于离散网格坐标的带隔离措施的SIR个体随机模型,模拟流行病传播过程
1.一例病例与易感者接触可能会被感染,单次接触感染概率为q。
2.病例的恢复时间分别为r1,在本模型中也被设置为一个正态分布的随机数字,正态分布的均数和标准差可根据实际情况进行设定;
5.恢复者在研究期间内具有免疫力,不会被感染;
6.不考虑种群流动性,不考虑自然出生和死亡,不考虑病死,本模型主要用于短期暴发的模拟。
带隔离措施的易感-感染-康复模型(Susceptible-Infectious-Recovered Model,SIR Model)的个体随机模型(ABM)可用于描述传染病传播。该模型将个体分为易感者(S)、感染者(I)和康复者(R)。在模型中,易感者可能因接触感染者而被感染,感染者则表现出症状并积极传播病毒,感染后在指定时间后有几率被隔离起来,防止其传播病毒,康复者是从感染中恢复并获得免疫力的个体。通过定义个体之间的交互和状态转变,SIR模型能够模拟疾病在群体中的传播动态,帮助研究人员分析隔离开始时间和隔离率对疾病暴发的影响。
表1 模型参数及其含义
参数 | 含义 | 取值1 | 取值2 |
---|---|---|---|
S(Susceptible) | 易感者 | 200 | 200 |
I(Infectious) | 感染者 | 50 | 100 |
R(Recovered) | 恢复者 | 0 | 0 |
N | 总人口数 | 250 | 300 |
r1 | 感染者恢复时间 | 14,4 | 16,4 |
r2 | 被隔离的感染者的恢复时间 | 10,4 | 12,4 |
q | 单次接触感染概率 | 0.5 | 0.5 |
p | 感染者被隔离的概率 | 0.5 | 0.5 |
t | 感染者被隔离的开始时间(感染后第几天) | 3 | 1 |
time_step | 时间步长 | 0.2 | 0.1 |
total_days | 模拟总天数 | 30 | 30 |
输入:个体数 N,疾病分布 disease_distribution,参数 params
输出:每日状态人数、新增感染和新增恢复人数
1 初始化 SIRModel 类,设置个体数 N 和疾病分布;
2 初始化个体状态(健康、感染、康复);
3 初始化每日状态记录列表;
4 for 每个时间步 t 从 0 到 T do
5 for 每个个体 x do
6 更新个体 x 的位置;
7 if x 是健康者 then
8 计算与其他个体的接触;
9 if 有邻居感染者 then
10 if 感染条件满足 then
11 更新 x 的状态为感染者;
12 end
13 end
14 end
15 更新感染者的状态;
16 检查隔离逻辑;
17 更新被隔离个体的状态;
18 记录当前状态人数;
19 end
20 计算新增感染和新增恢复人数;
21 保存每日状态记录到 CSV 文件;
22 end
23 生成折线图,基于每日状态数据绘图;
24 定义 unique_id,以便所有输出文件以该 ID 命名;
带隔离措施的 SIR 模型适合在传染病暴发情境中进行模拟,尤其适用于流感、COVID-19、麻疹等传染病。该模型能够反映易感个体在接触感染者时被感染的可能性,并考虑感染者的隔离措施对传播的影响。感染者在经过一段时间后会恢复并通常获得免疫力,恢复后的个体不再被感染。此外,疾病传播依赖于个体之间的接触网络,接触频率和接触模式会影响传播动态。模型的灵活性允许通过调整时间步长来模拟不同的传播速度和隔离效果,适用于需要细致观察传播动态的情况。
以下是关于ABM模型的设置、散点图绘制、GIF图合成、时间记录、输出结果保存为CSV文件、折线图绘制等在内的全部函数的实现,可以在主函数中根据需要调用这些函数,此外,所有的参数和各状态的初始值也可以从主函数中设置,方便进行不同地区、不同疾病的带隔离措施的SIR模型模拟。
import matplotlib.pyplot as plt
import random
import pandas as pd
import numpy as np
import imageio.v2 as imageio
import os
import time
# 定义模型
class SIRModel:
def __new__(cls, N, disease_distribution, params):
instance = super(SIRModel, cls).__new__(cls)
instance.num_agents = N
instance.data = []
# 初始化个体属性
instance.status = np.zeros(N, dtype=int) # 状态数组
instance.days_in_state = np.zeros(N, dtype=float)
instance.positions = np.random.randint(1, 100, size=(N, 2)) # 每个个体的坐标 [x, y]
instance.is_isolated = np.zeros(N, dtype=int) # 隔离状态
instance.r = None
instance.p = None
instance.q = None # 隔离概率
instance.time_step = None
instance.previous_status = None
return instance
def initialize(self, params, disease_distribution):
self.r1 = np.clip(np.random.normal(params['r1_mean'], params['r1_std'], size=self.num_agents), 2, 24).astype(int)
self.r2 = np.clip(np.random.normal(params['r2_mean'], params['r2_std'], size=self.num_agents), 2, 24).astype(int)
self.p = params['p']
self.q = params['q']
self.t = params['t']
self.time_step = params['time_step']
self.previous_data = None
# 根据疾病分布向量初始化个体状态
statuses = (
[0] * disease_distribution[0] + # S
[1] * disease_distribution[1] + # I
[2] * disease_distribution[2] # R
)
random.shuffle(statuses) # 随机打乱状态分配
self.status[:len(statuses)] = statuses # 设置状态
def UpdateStatusAndPosition(self):
# 随机移动
self.positions += np.random.choice([-1, 0, 1], size=self.positions.shape) # 更新位置
self.positions = np.clip(self.positions, 0, 99) # 确保坐标在有效范围内
# 检查邻居状态
for i in range(self.num_agents):
if self.status[i] == 0:
# 计算与其他个体的接触
distances = np.sqrt((self.positions[i, 0] - self.positions[:, 0]) ** 2 + (self.positions[i, 1] - self.positions[:, 1]) ** 2)
neighbors = (distances <= 1.5) & (self.status == 1) & (self.is_isolated == 0) # 只考虑未隔离的感染者
# 感染逻辑
if np.any(neighbors):
if np.any((self.status[neighbors] == 1) & (np.random.rand(np.sum(neighbors)) < self.q * self.time_step)):
self.status[i] = 1
# 隔离逻辑
infected_agents = (self.status == 1) # 找到所有感染者
random_values = np.random.rand(self.num_agents) # 生成随机数
# 仅在感染后经过设定的隔离时间后,才有可能被隔离
self.is_isolated[infected_agents] = (self.days_in_state[infected_agents] >= self.t) & (random_values[infected_agents] < self.p)
# 状态更新逻辑
infected_mask = (self.status == 1) # 感染状态
# 更新感染状态
self.days_in_state[infected_mask] += self.time_step
# 处理隔离个体的恢复
isolated_mask = (self.is_isolated == 1) # 找到所有被隔离的个体
recovered_mask = isolated_mask & (self.days_in_state >= self.r1) # 找到达到恢复期的个体
# 更新状态
self.status[recovered_mask] = 2 # 转为康复
self.is_isolated[recovered_mask] = 0 # 解除隔离
self.days_in_state[recovered_mask] = 0 # 重置天数
# 更新非隔离感染者的状态
non_isolated_infected_mask = infected_mask & (self.days_in_state >= self.r2) # 找到非隔离感染者
self.status[non_isolated_infected_mask] = 2 # 转为康复
self.days_in_state[non_isolated_infected_mask] = 0 # 重置天数
def record_data(self):
s_count = np.sum(self.status == 0) # 健康
i_count = np.sum(self.status == 1) # 感染(包括隔离者)
r_count = np.sum(self.status == 2) # 康复
if self.previous_data is not None:
new_infections = self.previous_data['S'] - s_count
new_recoveries = r_count - self.previous_data['R']
else:
new_infections = 0
new_recoveries = 0
# 记录当前数据
self.data.append({
"S": s_count,
"I": i_count,
"R": r_count,
"new_infections": new_infections,
"new_recoveries": new_recoveries
})
# 更新上一个数据
self.previous_data = {
'S': s_count,
'I': i_count,
'R': r_count
}
def plot_agents(model, day, output_dir, unique_id):
fig, ax = plt.subplots(figsize=(8, 8))
ax.set_xlim(0, 100)
ax.set_ylim(0, 100)
ax.set_title(f"step {day}")
# 根据状态分配颜色和标签
colors = {
0: 'blue', # 健康
1: 'red', # 感染
2: 'green' # 康复
}
labels = {
0: 'S',
1: 'I',
2: 'R'
}
for status, color in colors.items():
mask = model.status == status
ax.scatter(model.positions[mask, 0], model.positions[mask, 1], c=color, label=status, alpha=0.5)
ax.legend(loc='upper left')
plt.savefig(f"{output_dir}/frame_{day}_{unique_id}.png")
plt.close(fig)
def create_gif_with_timing(model, steps, unique_id):
start_time = time.time()
output_dir = "simulation_frames"
if not os.path.exists(output_dir):
os.makedirs(output_dir)
all_frames = []
gif_filename = f"seiar_spread_simulation_{unique_id}.gif"
for day in range(steps):
model.UpdateStatusAndPosition()
model.record_data()
plot_agents(model, day, output_dir, unique_id)
all_frames.append(imageio.imread(f"{output_dir}/frame_{day}_{unique_id}.png"))
imageio.mimsave(gif_filename, all_frames, duration=0.5)
end_time = time.time()
total_time = end_time - start_time
average_time_per_iteration = total_time / steps
print(f"Total simulation time: {total_time:.2f} seconds")
print(f"Average time per iteration: {average_time_per_iteration:.4f} seconds")
# 保存为CSV文件方便分析
def save_to_csv(model, filename):
data_to_save = []
for day in range(len(model.data)):
counts = model.data[day]
data_to_save.append({
"Day": day,
"S": counts['S'],
"I": counts['I'],
"R": counts['R'],
"new_infections": counts['new_infections'],
"new_recoveries": counts['new_recoveries']
})
df = pd.DataFrame(data_to_save)
df.to_csv(filename, index=False)
def run_simulation(model, steps, unique_id):
start_time = time.time()
for day in range(steps):
model.UpdateStatusAndPosition()
model.record_data()
end_time = time.time()
total_time = end_time - start_time
average_time_per_iteration = total_time / steps
print(f"Total simulation time: {total_time:.2f} seconds")
print(f"Average time per iteration: {average_time_per_iteration:.4f} seconds")
# 绘制折线图
def plot_daily_status_and_infections(csv_file, params, unique_id):
df = pd.read_csv(csv_file)
plt.figure(figsize=(10, 10))
# 绘制每日各状态人数
plt.subplot(2, 1, 1)
step = round(1 / params['time_step'])
plt.plot(df['Day'][::step], df['S'][::step], label='Susceptible (S)', color='blue')
plt.plot(df['Day'][::step], df['I'][::step], label='Infected (I)', color='red')
plt.plot(df['Day'][::step], df['R'][::step], label='Recovered (R)', color='green')
plt.title('a.Daily Status Counts')
plt.xlabel('Days')
plt.ylabel('Count')
plt.legend(loc='upper right')
x_ticks = df['Day'][::step] * params['time_step']
plt.xticks(ticks=df['Day'][::step], labels=(df['Day'][::step] * params['time_step']).astype(int), rotation=45)
# 绘制每日新增感染和新增恢复人数
plt.subplot(2, 1, 2)
new_infections_summed = df['new_infections'].values.reshape(-1, step).sum(axis=1)
new_recoveries_summed = df['new_recoveries'].values.reshape(-1, step).sum(axis=1)
new_days = df['Day'][::step][:len(new_infections_summed)]
plt.plot(new_days, new_infections_summed, label='New Infections', color='red')
plt.plot(new_days, new_recoveries_summed, label='New Recoveries', color='green')
plt.title('b.Daily New Infections and Recoveries Counts')
plt.xlabel('Days')
plt.ylabel('Count')
plt.legend()
plt.xticks(ticks=df['Day'][::step], labels=(df['Day'][::step] * params['time_step']).astype(int), rotation=45)
plt.tight_layout()
output_file = f"seiar_spread_{unique_id}.png"
plt.savefig(output_file)
plt.show()
这是第一套参数的个体随机模型实现,为了防止多套参数模拟后运行结果被覆盖,它所有的结果都命名为编号1,用户可以通过更改以下这些参数、初始值和时间步来调整模型达到预期效果。
if __name__ == "__main__":
unique_id = 1
disease_distribution = [200, 50, 0] # [S, I, R]
# 集中参数设置
N = sum(disease_distribution)
params = {
'r1_mean': 14, # I的恢复期
'r1_std': 4,
'r2_mean': 10, # 被隔离I的恢复期
'r2_std': 4,
'q': 0.5, # 感染概率
'p': 0.5, # 隔离概率
'time_step': 0.2, # 时间步长
'total_days': 30, # 模拟的天数
't':3*(1 / params['time_step']) # 开始隔离时间
}
model = SIRModel(N, disease_distribution, params)
model.initialize(params, disease_distribution)
# 设置模型的时间步
steps = int(params['total_days'] * (1 / params['time_step']))
# 计时
run_simulation(model, steps, unique_id)
# create_gif_with_timing(model, steps, unique_id)
# 保存至 CSV并绘图
save_to_csv(model, f"daily_status_data_{unique_id}.csv")
plot_daily_status_and_infections(f"daily_status_data_{unique_id}.csv", params, unique_id)
Total simulation time是此次SIR模型运行的全部用时,不包括画散点图和GIF图的时间,如需要绘制散点图和动图(可用于汇报展示),可以运行计时部分的被注释代码。 Average time per iteration就是运行每个时间步的平均用时,这次运行一共30*5个迭代。
Total simulation time: 1.51 seconds
Average time per iteration: 0.0101 seconds
下图展示了每天处于各状态的人数以及每天新增的感染人数和每天新增的恢复人数,个体随机模型的特点就是随机,因此折线图并不平滑,可能需要多次重复模拟求平均值会相对平滑。
图1 带隔离措施的SIR模型第一套参数模拟结果
下图是模拟动图中的一帧图像,直观展示了模拟的空间和各个个体所处的位置,动图可以更清晰地展示个体间的接触和状态转换,在汇报展示和宣传时更便于观众理解。
图2 带隔离措施的SIR模型第一套参数模拟散点图
这是第二套参数的个体随机模型实现,它所有的结果都命名为编号2,以下这些参数、初始值和时间步相对于上一套都有调整。
if __name__ == "__main__":
unique_id = 2
disease_distribution = [200, 100, 0] # [S, I, R]
# 集中参数设置
N = sum(disease_distribution)
params = {
'r1_mean': 16, # I的恢复期
'r1_std': 4,
'r2_mean': 12, # 被隔离I的恢复期
'r2_std': 4,
'q': 0.5, # 感染概率
'p': 0.3, # 隔离概率
'time_step': 0.1, # 时间步长
'total_days': 30, # 模拟的天数
't':1*(1 / params['time_step']) # 开始隔离时间
}
model = SIRModel(N, disease_distribution, params)
model.initialize(params, disease_distribution)
# 设置模型的时间步
steps = int(params['total_days'] * (1 / params['time_step']))
# 计时
run_simulation(model, steps, unique_id)
# create_gif_with_timing(model, steps, unique_id)
# 保存至 CSV并绘图
save_to_csv(model, f"daily_status_data_{unique_id}.csv")
plot_daily_status_and_infections(f"daily_status_data_{unique_id}.csv",
Total simulation time是此次SIR模型运行的全部用时,如需要绘制散点图和动图(可用于汇报展示),可以运行计时部分的被注释代码。 Average time per iteration就是运行每个时间步的平均用时,这次运行一共30*10个迭代。
Total simulation time: 3.05 seconds
Average time per iteration: 0.0102 seconds
下图展示了每天处于各状态的人数以及每天新增的感染人数和每天新增的恢复人数,折线图并不平滑,可能需要多次重复模拟求平均值会相对平滑。
图3 带隔离措施的SIR模型第一套参数模拟结果
在 SIR 模型中,各主要参数对模拟结果有显著影响。易感者(S)的数量越多,疫情传播潜力越大;感染者(I)数量直接影响传播速度;恢复者(R)的数量影响群体免疫的形成,R增多会结束暴发;总人口数(N)则决定了传播范围。
感染者的恢复时间(r1 )越长,传播时间也越长,暴发影响的人数就会增多;隔离者的恢复时间(r2)越短,说明隔离的效果越好,隔离会使暴发更早结束。
单次接触感染概率(q)越高,疫情传播风险越大。时间步长(time_step)影响模拟精度。
隔离概率(p)和隔离开始时间(t)是影响隔离效果的因素,隔离概率越高、隔离开始时间越早,暴发会越快被平息。
以第一套参数为例,个体每天平均接触别的个体的次数c为1次,自然病程D在模型中为r1,按照公式R0=q×c×D,q=R0/(c×D)已知R0可以求得单次接触感染概率q。
c=8×5×250/(100×100)=1