通过 TOPSIS 赋权优化多模型融合的 SHAP 解释提升模型透明性

Xing Abao Lv3

本示例其核心目标是构建一个高性能的分类预测模型。整个流程始于数据准备,历经多个独立模型的训练与优化,最终通过一种基于多标准决策分析的先进方法,将这些独立模型融合成一个性能更优的集成模型。

最后,实现了一个自定义的 EnsembleModel 类,用于执行加权模型融合。该类接收所有基模型以及通过TOPSIS计算出的权重。其核心的 predict_proba 方法会获取每个基模型对新数据的预测概率,然后根据TOPSIS权重对这些概率进行加权平均,从而得到一个综合的预测概率。最终的分类结果则基于这个加权概率(通常以 0.5 为阈值)来决定。这个集成模型同样经过了严格的性能评估,并与所有基模型进行了直观的比较。脚本通过绘制所有模型在测试集上的 ROC 曲线,将集成模型的曲线用醒目的颜色突出显示,清晰地展示了通过TOPSIS加权融合后的集成模型相较于任何单一基模型在性能上的显著提升。

整个流程的最终产物,包括所有优化后的基模型和最终的集成模型,都可以通过 joblib 库进行持久化保存,以便未来直接调用。

模拟案例

加载模块

1

加载数据

1
2
3
wkdir = 'C:/Users/Admins/Desktop'
dtdir = 'Z:/TData/big-data/sad41d8cd/251028_Multi_Model_SHAP_TOPSIS_Fusion'
df = pd.read_excel(f'{dtdir}/data.xlsx')
1
2
3
4
5
6
7
8
9
10
df.head()
Out[3]:
SystolicBP Hb ... AtrialFibrillationType Electrical_cardioversion
0 132.0 152.0 ... 1 1
1 97.0 132.0 ... 1 0
2 126.0 141.0 ... 1 1
3 112.0 105.0 ... 0 0
4 113.0 142.0 ... 1 1

[5 rows x 23 columns]
1
2
3
4
5
6
7
8
df.columns
Out[4]:
Index(['SystolicBP', 'Hb', 'LeftAtrialDiam', 'DiastolicBP', 'Age', 'BMI',
'SurgeryTime', 'Cr', 'CHA2DS2VASC', 'FT4', 'AFCourse', 'TSH',
'NtproBNP', 'ACEI_ARB_ARNI', 'Statin', 'SmokingHistory', 'HTN', 'B',
'Rivaroxaban', 'Sex', 'Dabigatran', 'AtrialFibrillationType',
'Electrical_cardioversion'],
dtype='object')
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
df.info()
<class 'pandas.core.frame.DataFrame'>
RangeIndex: 256 entries, 0 to 255
Data columns (total 23 columns):
# Column Non-Null Count Dtype
--- ------ -------------- -----
0 SystolicBP 256 non-null float64
1 Hb 256 non-null float64
2 LeftAtrialDiam 256 non-null float64
3 DiastolicBP 256 non-null int64
4 Age 256 non-null int64
5 BMI 256 non-null float64
6 SurgeryTime 256 non-null float64
7 Cr 256 non-null int64
8 CHA2DS2VASC 256 non-null int64
9 FT4 256 non-null float64
10 AFCourse 256 non-null int64
11 TSH 256 non-null float64
12 NtproBNP 256 non-null int64
13 ACEI_ARB_ARNI 256 non-null int64
14 Statin 256 non-null int64
15 SmokingHistory 256 non-null int64
16 HTN 256 non-null int64
17 B 256 non-null int64
18 Rivaroxaban 256 non-null int64
19 Sex 256 non-null int64
20 Dabigatran 256 non-null int64
21 AtrialFibrillationType 256 non-null int64
22 Electrical_cardioversion 256 non-null int64
dtypes: float64(7), int64(16)
memory usage: 46.1 KB

划分数据

1
2
3
4
5
6
7
8
9
10
11
12
# 划分特征和目标变量
X = df.drop(['Electrical_cardioversion'], axis = 1)
y = df['Electrical_cardioversion']

# 划分训练集和测试集
X_train, X_test, y_train, y_test = train_test_split(
X,
y,
test_size = 0.3,
random_state = 42,
stratify = df['Electrical_cardioversion']
)

数据过采样

1
2
3
4
5
6
7
# 导入`SMOTE`算法用于数据过采样
# `sampling_strategy = 1` 表示将少数类样本的数量增加到与多数类相同, 即使样本平衡
# `k_neighbors = 20` 表示用于生成合成样本时使用 20 个最近邻点, SMOTE 算法将基于这些邻居生成新的样本
smote = SMOTE(sampling_strategy = 1, k_neighbors = 20, random_state = 42)

# 使用`SMOTE`对训练集数据进行过采样, 生成新的平衡数据集
X_SMOTE_train, y_SMOTE_train = smote.fit_resample(X_train, y_train)
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
X_SMOTE_train
Out[46]:
SystolicBP Hb ... Dabigatran AtrialFibrillationType
0 116.000000 149.000000 ... 1 0
1 124.000000 140.000000 ... 1 0
2 117.000000 137.000000 ... 1 1
3 140.000000 100.000000 ... 0 0
4 110.000000 129.000000 ... 0 0
.. ... ... ... ... ...
239 117.717327 161.630396 ... 1 1
240 97.276607 123.184404 ... 0 0
241 121.418391 151.831139 ... 0 1
242 134.716083 117.602548 ... 1 1
243 137.162555 125.486900 ... 0 1

[244 rows x 22 columns]

逐个加载每个模型

1
2
3
4
5
6
7
BernoulliNB = joblib.load(f'{dtdir}/BernoulliNB.pkl')
DecisionTree = joblib.load(f'{dtdir}/DecisionTree.pkl')
GradientBoosting = joblib.load(f'{dtdir}/GradientBoosting.pkl')
LightGBM = joblib.load(f'{dtdir}/LightGBM.pkl')
RandomForest = joblib.load(f'{dtdir}/RandomForest.pkl')
SVM = joblib.load(f'{dtdir}/SVM.pkl')
XGBoost = joblib.load(f'{dtdir}/XGBoost.pkl')

加载集成模型

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
# 定义 EnsembleModel 类
class EnsembleModel(BaseEstimator, ClassifierMixin):
def __init__(self, models, weights):
"""
初始化集成模型。
:param models: 字典类型,包含每个基学习器模型
:param weights: 模型权重数组(通过TOPSIS的百分比占比得到)
"""
self.models = models
self.weights = np.array(weights) # 权重 (百分比占比)

def fit(self, X, y):
"""
集成模型的fit方法,基学习器会进行训练。
"""
for model in self.models.values():
model.fit(X, y) # 为每个模型调用 fit 方法
return self

def predict_proba(self, X):
"""
返回加权后的预测概率。
:param X: 测试数据集
:return: 返回两个类别的加权预测概率,格式为 (n_samples, 2)
"""
probas = np.zeros((X.shape[0], 2)) # 创建一个二维数组,每行有两个值,分别表示类别0和类别1的概率

# 获取每个模型的预测概率
for i, model in enumerate(self.models.values()):
model_proba = model.predict_proba(X) # 获取模型的预测概率
probas[:, 0] += model_proba[:, 0] * self.weights[i] # 加权类别 0 的概率
probas[:, 1] += model_proba[:, 1] * self.weights[i] # 加权类别 1 的概率

return probas # 返回加权后的两个类别的概率

def predict(self, X):
"""
返回预测结果,基于加权后的预测概率判断类别。
:param X: 测试数据集
:return: 返回最终预测类别(0 或 1)
"""
weighted_proba = self.predict_proba(X)
return (weighted_proba[:, 1] >= 0.5).astype(int) # 如果加权概率 >= 0.5,则预测为类别1,否则为类别0

ensemble_model = joblib.load(f'{dtdir}/ensemble_model.pkl')

绘制 SHAP 热图

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
explainer = shap.TreeExplainer(RandomForest)
shap_values = explainer.shap_values(X_test)

# 创建 shap.Explanation 对象
shap_explanation = shap.Explanation(
shap_values[:, :, 1], # 绘制类别 1
base_values = explainer.expected_value,
data = X_test,
feature_names = X_test.columns
)

# 绘制热图
plt.figure(figsize = (10, 5), dpi = 1200)
shap.plots.heatmap(shap_explanation, show = False)
plt.savefig("SHAP_numpy Sorted Feature Importance.png", bbox_inches = 'tight')
plt.close()

排序特征

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
feature_names = X_test.columns

# 存储每个模型的特征重要性分数
model_importances = {}

# 定义模型
models = {
"DecisionTree": DecisionTree,
"GradientBoosting": GradientBoosting,
"LightGBM": LightGBM,
"RandomForest": RandomForest,
"XGBoost": XGBoost
}

# 提取每个模型的特征重要性分数
for model_name, model in models.items():
if hasattr(model, 'feature_importances_'):
model_importances[model_name] = model.feature_importances_

# 将字典转换为 DataFrame,按特征名和模型名排列
importance_df = pd.DataFrame(model_importances, index=feature_names)

# 为了避免出现0或1,可以对每个特征的值加上一个小的常量
importance_df += 1e-6 # 添加一个小的常量,避免归一化结果为0

# 使用 MinMaxScaler 归一化特征重要性分数
scaler = MinMaxScaler()
normalized_importance_df = pd.DataFrame(
scaler.fit_transform(importance_df),
columns = importance_df.columns,
index = importance_df.index
)

# 计算每一行的特征重要性之和,作为 SUM 列
normalized_importance_df['SUM'] = normalized_importance_df.sum(axis = 1)

# 根据 SUM 列的值进行排名,使用 sort_values 按照 SUM 排序,并且加上 Rank 列
normalized_importance_df['Rank'] = normalized_importance_df['SUM'].rank(ascending = False, method = 'min')

# 根据 Rank 列进行排序
normalized_importance_df = normalized_importance_df.sort_values(by = 'Rank', ascending = True)
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
normalized_importance_df.head(10)
Out[55]:
DecisionTree GradientBoosting ... SUM Rank
NtproBNP 1.000000 1.000000 ... 4.319592 1.0
LeftAtrialDiam 0.643856 0.674587 ... 3.708813 2.0
AtrialFibrillationType 0.898733 0.582124 ... 3.470479 3.0
BMI 0.163642 0.340327 ... 2.054366 4.0
Cr 0.170002 0.232424 ... 1.892492 5.0
SystolicBP 0.119194 0.264296 ... 1.848557 6.0
AFCourse 0.000000 0.275588 ... 1.790832 7.0
Hb 0.504348 0.211506 ... 1.638117 8.0
Age 0.079370 0.183152 ... 1.539752 9.0
SurgeryTime 0.262613 0.183058 ... 1.367238 10.0

[10 rows x 7 columns]

完整代码

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
# -*- coding: utf-8 -*-

import shap
import joblib
import pandas as pd
import numpy as np
import matplotlib.pyplot as plt
from sklearn.model_selection import train_test_split
from sklearn.base import BaseEstimator, ClassifierMixin
from sklearn.preprocessing import MinMaxScaler
from imblearn.over_sampling import SMOTE

plt.rcParams['font.family'] = 'Times New Roman'
plt.rcParams['axes.unicode_minus'] = False

import warnings
warnings.filterwarnings("ignore")


if __name__ == '__main__':

wkdir = 'C:/Users/Admins/Desktop'
dtdir = 'Z:/TData/big-data/sad41d8cd/251028_Multi_Model_SHAP_TOPSIS_Fusion'
df = pd.read_excel(f'{dtdir}/data.xlsx')

if True:
df.head()
df.columns
df.info()

# 划分数据
if True:

# 划分特征和目标变量
X = df.drop(['Electrical_cardioversion'], axis = 1)
y = df['Electrical_cardioversion']

# 划分训练集和测试集
X_train, X_test, y_train, y_test = train_test_split(
X,
y,
test_size = 0.3,
random_state = 42,
stratify = df['Electrical_cardioversion']
)

# 据过采样
if True:

# 导入`SMOTE`算法用于数据过采样
# `sampling_strategy = 1` 表示将少数类样本的数量增加到与多数类相同, 即使样本平衡
# `k_neighbors = 20` 表示用于生成合成样本时使用 20 个最近邻点, SMOTE 算法将基于这些邻居生成新的样本
smote = SMOTE(sampling_strategy = 1, k_neighbors = 20, random_state = 42)

# 使用`SMOTE`对训练集数据进行过采样, 生成新的平衡数据集
X_SMOTE_train, y_SMOTE_train = smote.fit_resample(X_train, y_train)

# 逐个加载每个模型
if True:

BernoulliNB = joblib.load(f'{dtdir}/BernoulliNB.pkl')
DecisionTree = joblib.load(f'{dtdir}/DecisionTree.pkl')
GradientBoosting = joblib.load(f'{dtdir}/GradientBoosting.pkl')
LightGBM = joblib.load(f'{dtdir}/LightGBM.pkl')
RandomForest = joblib.load(f'{dtdir}/RandomForest.pkl')
SVM = joblib.load(f'{dtdir}/SVM.pkl')
XGBoost = joblib.load(f'{dtdir}/XGBoost.pkl')

# 加载集成模型
if True:

# 定义 EnsembleModel 类
class EnsembleModel(BaseEstimator, ClassifierMixin):
def __init__(self, models, weights):
"""
初始化集成模型。
:param models: 字典类型,包含每个基学习器模型
:param weights: 模型权重数组(通过TOPSIS的百分比占比得到)
"""
self.models = models
self.weights = np.array(weights) # 权重 (百分比占比)

def fit(self, X, y):
"""
集成模型的fit方法,基学习器会进行训练。
"""
for model in self.models.values():
model.fit(X, y) # 为每个模型调用 fit 方法
return self

def predict_proba(self, X):
"""
返回加权后的预测概率。
:param X: 测试数据集
:return: 返回两个类别的加权预测概率,格式为 (n_samples, 2)
"""
probas = np.zeros((X.shape[0], 2)) # 创建一个二维数组,每行有两个值,分别表示类别0和类别1的概率

# 获取每个模型的预测概率
for i, model in enumerate(self.models.values()):
model_proba = model.predict_proba(X) # 获取模型的预测概率
probas[:, 0] += model_proba[:, 0] * self.weights[i] # 加权类别 0 的概率
probas[:, 1] += model_proba[:, 1] * self.weights[i] # 加权类别 1 的概率

return probas # 返回加权后的两个类别的概率

def predict(self, X):
"""
返回预测结果,基于加权后的预测概率判断类别。
:param X: 测试数据集
:return: 返回最终预测类别(0 或 1)
"""
weighted_proba = self.predict_proba(X)
return (weighted_proba[:, 1] >= 0.5).astype(int) # 如果加权概率 >= 0.5,则预测为类别1,否则为类别0

ensemble_model = joblib.load(f'{dtdir}/ensemble_model.pkl')

# SHAP
if True:

explainer = shap.TreeExplainer(RandomForest)
shap_values = explainer.shap_values(X_test)

# 创建 shap.Explanation 对象
shap_explanation = shap.Explanation(
shap_values[:, :, 1], # 绘制类别 1
base_values = explainer.expected_value,
data = X_test,
feature_names = X_test.columns
)

# 绘制热图
plt.figure(figsize = (10, 5), dpi = 1200)
shap.plots.heatmap(shap_explanation, show = False)
plt.savefig("SHAP_numpy Sorted Feature Importance.png", bbox_inches = 'tight')
plt.close()

# 排序特征
if True:

feature_names = X_test.columns

# 存储每个模型的特征重要性分数
model_importances = {}

# 定义模型
models = {
"DecisionTree": DecisionTree,
"GradientBoosting": GradientBoosting,
"LightGBM": LightGBM,
"RandomForest": RandomForest,
"XGBoost": XGBoost
}

# 提取每个模型的特征重要性分数
for model_name, model in models.items():
if hasattr(model, 'feature_importances_'):
model_importances[model_name] = model.feature_importances_

# 将字典转换为 DataFrame,按特征名和模型名排列
importance_df = pd.DataFrame(model_importances, index=feature_names)

# 为了避免出现0或1,可以对每个特征的值加上一个小的常量
importance_df += 1e-6 # 添加一个小的常量,避免归一化结果为0

# 使用 MinMaxScaler 归一化特征重要性分数
scaler = MinMaxScaler()
normalized_importance_df = pd.DataFrame(
scaler.fit_transform(importance_df),
columns = importance_df.columns,
index = importance_df.index
)

# 计算每一行的特征重要性之和,作为 SUM 列
normalized_importance_df['SUM'] = normalized_importance_df.sum(axis = 1)

# 根据 SUM 列的值进行排名,使用 sort_values 按照 SUM 排序,并且加上 Rank 列
normalized_importance_df['Rank'] = normalized_importance_df['SUM'].rank(ascending = False, method = 'min')

# 根据 Rank 列进行排序
normalized_importance_df = normalized_importance_df.sort_values(by = 'Rank', ascending = True)
  • Titre: 通过 TOPSIS 赋权优化多模型融合的 SHAP 解释提升模型透明性
  • Auteur: Xing Abao
  • Créé à : 2025-10-28 10:59:48
  • Mis à jour à : 2025-10-28 22:40:55
  • Lien: https://bioinformatics.vip/2025/10/28/sad41d8cd/251028_Multi_Model_SHAP_TOPSIS_Fusion/
  • Licence: Cette œuvre est sous licence CC BY-NC-SA 4.0.
Commentaires