1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 70 71 72 73 74 75 76 77 78 79 80 81 82 83 84 85 86 87 88 89 90 91 92 93 94 95 96 97 98 99 100 101 102 103 104 105 106 107 108 109 110 111 112 113 114 115 116 117 118 119 120 121 122 123 124 125 126 127 128 129 130 131 132 133 134 135 136 137 138 139 140 141 142 143 144 145 146 147 148 149 150 151 152 153 154 155 156 157 158 159 160 161 162 163 164 165 166 167 168 169 170 171 172 173 174 175 176 177 178 179 180 181
|
import shap import joblib import pandas as pd import numpy as np import matplotlib.pyplot as plt from sklearn.model_selection import train_test_split from sklearn.base import BaseEstimator, ClassifierMixin from sklearn.preprocessing import MinMaxScaler from imblearn.over_sampling import SMOTE
plt.rcParams['font.family'] = 'Times New Roman' plt.rcParams['axes.unicode_minus'] = False
import warnings warnings.filterwarnings("ignore")
if __name__ == '__main__': wkdir = 'C:/Users/Admins/Desktop' dtdir = 'Z:/TData/big-data/sad41d8cd/251028_Multi_Model_SHAP_TOPSIS_Fusion' df = pd.read_excel(f'{dtdir}/data.xlsx') if True: df.head() df.columns df.info()
if True: X = df.drop(['Electrical_cardioversion'], axis = 1) y = df['Electrical_cardioversion'] X_train, X_test, y_train, y_test = train_test_split( X, y, test_size = 0.3, random_state = 42, stratify = df['Electrical_cardioversion'] ) if True: smote = SMOTE(sampling_strategy = 1, k_neighbors = 20, random_state = 42) X_SMOTE_train, y_SMOTE_train = smote.fit_resample(X_train, y_train)
if True: BernoulliNB = joblib.load(f'{dtdir}/BernoulliNB.pkl') DecisionTree = joblib.load(f'{dtdir}/DecisionTree.pkl') GradientBoosting = joblib.load(f'{dtdir}/GradientBoosting.pkl') LightGBM = joblib.load(f'{dtdir}/LightGBM.pkl') RandomForest = joblib.load(f'{dtdir}/RandomForest.pkl') SVM = joblib.load(f'{dtdir}/SVM.pkl') XGBoost = joblib.load(f'{dtdir}/XGBoost.pkl') if True: class EnsembleModel(BaseEstimator, ClassifierMixin): def __init__(self, models, weights): """ 初始化集成模型。 :param models: 字典类型,包含每个基学习器模型 :param weights: 模型权重数组(通过TOPSIS的百分比占比得到) """ self.models = models self.weights = np.array(weights) def fit(self, X, y): """ 集成模型的fit方法,基学习器会进行训练。 """ for model in self.models.values(): model.fit(X, y) return self def predict_proba(self, X): """ 返回加权后的预测概率。 :param X: 测试数据集 :return: 返回两个类别的加权预测概率,格式为 (n_samples, 2) """ probas = np.zeros((X.shape[0], 2)) for i, model in enumerate(self.models.values()): model_proba = model.predict_proba(X) probas[:, 0] += model_proba[:, 0] * self.weights[i] probas[:, 1] += model_proba[:, 1] * self.weights[i] return probas def predict(self, X): """ 返回预测结果,基于加权后的预测概率判断类别。 :param X: 测试数据集 :return: 返回最终预测类别(0 或 1) """ weighted_proba = self.predict_proba(X) return (weighted_proba[:, 1] >= 0.5).astype(int) ensemble_model = joblib.load(f'{dtdir}/ensemble_model.pkl')
if True: explainer = shap.TreeExplainer(RandomForest) shap_values = explainer.shap_values(X_test) shap_explanation = shap.Explanation( shap_values[:, :, 1], base_values = explainer.expected_value, data = X_test, feature_names = X_test.columns ) plt.figure(figsize = (10, 5), dpi = 1200) shap.plots.heatmap(shap_explanation, show = False) plt.savefig("SHAP_numpy Sorted Feature Importance.png", bbox_inches = 'tight') plt.close()
if True: feature_names = X_test.columns model_importances = {} models = { "DecisionTree": DecisionTree, "GradientBoosting": GradientBoosting, "LightGBM": LightGBM, "RandomForest": RandomForest, "XGBoost": XGBoost } for model_name, model in models.items(): if hasattr(model, 'feature_importances_'): model_importances[model_name] = model.feature_importances_ importance_df = pd.DataFrame(model_importances, index=feature_names) importance_df += 1e-6 scaler = MinMaxScaler() normalized_importance_df = pd.DataFrame( scaler.fit_transform(importance_df), columns = importance_df.columns, index = importance_df.index ) normalized_importance_df['SUM'] = normalized_importance_df.sum(axis = 1) normalized_importance_df['Rank'] = normalized_importance_df['SUM'].rank(ascending = False, method = 'min') normalized_importance_df = normalized_importance_df.sort_values(by = 'Rank', ascending = True)
|