从机器学习到因果推断的汽车性能数据分析与关系估计案例实践

Xing Abao Lv3

考虑哪些因素会影响汽车的重量

这个示例完整地展示了一个从数据预处理、机器学习预测建模、模型可解释性分析到高级因果推断的综合性数据分析流程。其核心目标不仅是构建一个能够准确预测汽车重量的机器学习模型,更是深入挖掘数据背后复杂的因果关系,探究诸如排量、马力等关键性能指标是如何从因果层面影响汽车重量的,从而超越传统的相关性分析,提供更具决策价值的洞见。

模拟案例

加载模块

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
import shap
import graphviz
import pandas as pd
import numpy as np
import matplotlib.pyplot as plt
import dowhy
from dowhy import CausalModel
from sklearn.impute import KNNImputer
from sklearn.model_selection import train_test_split
from lightgbm import LGBMRegressor
from sklearn.model_selection import GridSearchCV
from sklearn import metrics
from causallearn.search.FCMBased import lingam
from causallearn.search.FCMBased.lingam.utils import make_dot
from IPython.display import Image, display

plt.rcParams['font.family'] = 'Times New Roman'
plt.rcParams['axes.unicode_minus'] = False

import warnings
warnings.filterwarnings("ignore")

加载数据

1
2
path = 'Z:/TData/big-data/sad41d8cd/251022_From_Prediction_to_Causal_Inference.xlsx'
df = pd.read_excel(path)
1
2
3
4
5
6
7
8
df.head()
Out[79]:
mpg cylinders displacement horsepower weight acceleration
0 18.0 8 307.0 130.0 3504 12.0
1 15.0 8 350.0 165.0 3693 11.5
2 18.0 8 318.0 150.0 3436 11.0
3 16.0 8 304.0 150.0 3433 12.0
4 17.0 8 302.0 140.0 3449 10.5
1
2
3
4
5
df.columns
Out[80]:
Index(['mpg', 'cylinders', 'displacement', 'horsepower', 'weight',
'acceleration'],
dtype='object')

缺失值处理

1
2
3
4
5
6
7
8
9
10
11
# 缺失值处理
print("缺失值:", df.isnull().sum().sum())
# 缺失值: 6

# 初始化 KNN 填补器,设置邻居数量为 5
knn_imputer = KNNImputer(n_neighbors = 5)
# 对数据进行填补
df = pd.DataFrame(knn_imputer.fit_transform(df), columns=df.columns)

print("填补后:", df.isnull().sum().sum())
# 填补后: 0

分析流程首先从数据准备开始,并立即检查了数据的完整性,发现存在 6 个缺失值。

为了保证数据质量,代码采用了 K-近邻 (KNN) 算法进行插补。这种方法通过寻找与缺失值样本最相似的 5 个邻居,并利用这些邻居的特征信息来估算和填充缺失值,相比于简单的均值或中位数填充,能够更好地保留数据的内在结构和分布。完成填充后,数据集变得完整。

训练集和测试集

1
2
3
4
5
6
7
8
9
10
11
# 划分特征和目标变量
X = df.drop(['weight'], axis = 1) # 从数据集中去掉目标变量 'y',得到特征变量 X
y = df['weight'] # 提取目标变量 y

# 划分训练集和测试集
X_train, X_test, y_train, y_test = train_test_split(
X, # 特征变量
y, # 目标变量
test_size = 0.3, # 测试集所占比例,这里为 30%
random_state = 42 # 随机种子,确保结果可重复
)

接下来,进入了预测建模阶段。分析的目标被设定为预测汽车的重量weight,因此weight列被作为目标变量 y,其余所有列则作为特征变量 X。为了客观评估模型的性能,数据集被划分为 70% 的训练集和 30% 的测试集。

K 折交叉验证

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
# 初始化 LightGBM 回归模型
model_lgbm = LGBMRegressor(random_state = 42, verbose = -1)

# 定义参数网格
param_grid_lgbm = {
'n_estimators': [50, 100, 200], # 树的数量
'learning_rate': [0.01, 0.1, 0.2], # 学习率
'max_depth': [-1, 10, 20], # 最大深度
'num_leaves': [31, 50, 100], # 叶节点数
'min_child_samples': [10, 20, 30] # 最小叶节点样本数
}

# 使用 GridSearchCV 进行网格搜索和 k 折交叉验证
grid_search_lgbm = GridSearchCV(
estimator = model_lgbm,
param_grid = param_grid_lgbm,
scoring = 'neg_mean_squared_error', # 评价指标为负均方误差
cv = 5, # 5 折交叉验证
n_jobs = -1, # 并行计算
verbose = 1 # 输出详细进度信息
)

# 训练模型
grid_search_lgbm.fit(X_train, y_train)

# 使用最优参数训练模型
best_model_lgbm = grid_search_lgbm.best_estimator_
1
Fitting 5 folds for each of 243 candidates, totalling 1215 fits

模型选择了性能卓越且效率极高的 LightGBM 回归器。为了充分发挥模型的潜力,利用 网格搜索与交叉验证GridSearchCV技术进行超参数调优,系统性地搜索了包括树的数量、学习率、树的最大深度在内的多组参数组合,并通过 5 折交叉验证的方式,以负均方误差为标准,自动找出了最优的模型配置。

构建模型

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
# 对训练集进行预测
y_pred_train = best_model_lgbm.predict(X_train)
y_pred_train_list = y_pred_train.tolist()

# 对测试集进行预测
y_pred_test = best_model_lgbm.predict(X_test)
y_pred_test_list = y_pred_test.tolist()

# 计算训练集的指标
mse_train = metrics.mean_squared_error(y_train, y_pred_train_list)
rmse_train = np.sqrt(mse_train)
mae_train = metrics.mean_absolute_error(y_train, y_pred_train_list)
r2_train = metrics.r2_score(y_train, y_pred_train_list)

# 计算测试集的指标
mse_test = metrics.mean_squared_error(y_test, y_pred_test_list)
rmse_test = np.sqrt(mse_test)
mae_test = metrics.mean_absolute_error(y_test, y_pred_test_list)
r2_test = metrics.r2_score(y_test, y_pred_test_list)

print("训练集评价指标:")
print("均方误差 (MSE):", mse_train)
print("均方根误差 (RMSE):", rmse_train)
print("平均绝对误差 (MAE):", mae_train)
print("拟合优度 (R-squared):", r2_train)

print("\n测试集评价指标:")
print("均方误差 (MSE):", mse_test)
print("均方根误差 (RMSE):", rmse_test)
print("平均绝对误差 (MAE):", mae_test)
print("拟合优度 (R-squared):", r2_test)
1
2
3
4
5
6
7
8
9
10
11
训练集评价指标:
均方误差 (MSE): 11520.037808821813
均方根误差 (RMSE): 107.33143905129481
平均绝对误差 (MAE): 63.2052858667154
拟合优度 (R-squared): 0.9832598167973577

测试集评价指标:
均方误差 (MSE): 38909.86398028576
均方根误差 (RMSE): 197.255833830804
平均绝对误差 (MAE): 156.48301700530283
拟合优度 (R-squared): 0.9497136999682628

训练完成后,模型在测试集上取得了高达 0.9497 的 R-squared 值,这表明模型已经能够非常准确地预测汽车重量,成功学习到了各特征与重量之间的强相关关系。

模型可解释性

然而,一个高精度的预测模型往往像一个“黑箱”,虽然知道它有效,却不清楚其内部的决策逻辑。

为了解决这个问题,代码引入了SHAP(SHapley Additive exPlanations)进行模型可解释性分析。通过shap.TreeExplainer,代码计算了每个特征对每一次预测的具体贡献值。生成的 SHAP 条形图直观地展示了各特征的全局重要性,结果显示displacement(排量)是影响重量预测的最关键因素。而信息更丰富的 SHAP 摘要图则进一步揭示了特征影响的方向和模式:例如,图中高数值(红色)的displacement点几乎全部落在 SHAP 值为正的区域,而低数值(蓝色)的点则相反,这清晰地表明了排量越大,模型预测的重量就越重。

至此,不仅有了精准的预测,还理解了模型做出预测的依据,但这一切仍停留在相关性层面。

1
2
3
4
5
6
7
8
explainer = shap.TreeExplainer(best_model_lgbm)
shap_values = explainer.shap_values(X_test)

plt.figure(figsize = (10, 5))
shap.summary_plot(shap_values, X_test, plot_type = "bar", show = False)
plt.tight_layout()
plt.savefig('C:/Users/Administrator/Desktop/barplot.png', format = 'png',bbox_inches = 'tight', dpi = 1200)
plt.show()
1
2
3
4
plt.figure(figsize = (10, 5))
shap.summary_plot(shap_values , X_test, feature_names = X_test.columns, plot_type = "dot", show = False)
plt.savefig('C:/Users/Administrator/Desktop/dotplot.png', format = 'pdf',bbox_inches = 'tight', dpi = 1200)
plt.show()
1
2
df_causally = df[['displacement', 'horsepower', 'acceleration', 'weight']]
df_causally
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
Out[108]: 
displacement horsepower acceleration weight
0 307.0 130.0 12.0 3504.0
1 350.0 165.0 11.5 3693.0
2 318.0 150.0 11.0 3436.0
3 304.0 150.0 12.0 3433.0
4 302.0 140.0 10.5 3449.0
.. ... ... ... ...
393 140.0 86.0 15.6 2790.0
394 97.0 52.0 24.6 2130.0
395 135.0 84.0 11.6 2295.0
396 120.0 79.0 18.6 2625.0
397 119.0 82.0 19.4 2720.0

[398 rows x 4 columns]

因果推断

为了探究问题的本质,即一个变量的改变是否以及在多大程度上“导致”了另一个变量的改变,代码进入了整个分析流程中最核心、最深入的部分:因果推断。这里展示了两种构建因果图的方法。

自动推断因果关系

第一种是数据驱动的因果发现,利用causallearn库中的 ICALiNGAM 算法,直接从数据中学习变量间的因果结构。该算法推断出一个有向无环图(DAG),显示horsepower(马力)是根源,影响着displacementacceleration(加速度),而这三者最终共同决定weight。基于这个学习到的因果图,代码使用dowhy库,遵循“识别-估计-反驳”的严谨框架来量化因果效应。例如,在估计displacementweight的因果效应时,dowhy首先识别出需要通过控制混杂变量horsepoweracceleration(后门调整)来消除偏见,然后估计出因果效应值约为5.78。最关键的是,代码进行了一系列反驳检验,如添加随机共同原因、安慰剂处理和数据子集检验,所有检验均以高p值通过,极大地增强了该因果估计值的可信度。同样稳健的分析流程也被应用于horsepoweracceleration

ICALiNGAM,主要通过数据学习因果结构,自动推断因果关系,特别是根据非高斯性来识别因果方向。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
# 使用数据的列名来生成标签
labels = [f'{col}' for i, col in enumerate(df_causally.columns)]

# 将数据转换为 numpy 数组
data = df_causally.to_numpy()

# 创建并初始化一个ICALiNGAM模型对象
model_lingam = lingam.ICALiNGAM()

# 使用数据拟合模型,学习因果关系
model_lingam.fit(data)

# 使用make_dot函数根据模型的邻接矩阵(adjacency matrix)生成因果图,labels为节点标签
dot_graph = make_dot(model_lingam.adjacency_matrix_, labels=labels)
dot_graph.render("C:/Users/Administrator/Desktop//lingam_causal_graph", format = "png")

构建因果图

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
# 定义一个函数用于创建因果图的可视化
def make_graph(adjacency_matrix, labels = None):

# 将邻接矩阵中大于 0.01 的元素标记为 True(表示存在边),否则为 False
idx = np.abs(adjacency_matrix) > 0.01

# 获取邻接矩阵中存在边的所有索引位置(i.e., 行和列的索引)
dirs = np.where(idx)

# 使用 graphviz 库创建一个有向图对象,图形引擎设置为'dot'
d = graphviz.Digraph(engine = 'dot')

# 如果提供了标签,则使用标签,否则默认使用 'x0', 'x1', ... 作为节点标签
names = labels if labels else [f'x{i}' for i in range(len(adjacency_matrix))]

# 为每个节点添加一个图节点
for name in names:
d.node(name)

# 遍历邻接矩阵中存在边的节点对,并为每一条边添加可视化表示
for to, from_, coef in zip(dirs[0], dirs[1], adjacency_matrix[idx]):
# 使用边的起点和终点以及边的系数(权重)来添加边
d.edge(names[from_], names[to], label=str(coef))

# 返回创建的因果图对象
return d

# 定义一个函数将输入的 Graphviz 图的字符串转换为有效的 DOT 格式
def str_to_dot(string):
'''
Converts input string from graphviz library to valid DOT graph format.
'''
# 去除多余的空格和换行符,并将多行转换为一个有效的DOT格式(每行末尾加分号)
graph = string.strip().replace('\n', ';').replace('\t', '')

# 移除不必要的字符,从而确保 DOT 格式的正确性
graph = graph[:9] + graph[10:-2] + graph[-1] # 删除字符串中的多余字符

# 返回符合 DOT 格式的字符串
return graph
1
2
3
4
5
np.set_printoptions(precision = 3, suppress = True)
np.random.seed(0)

# 使用 make_graph 函数根据 LiNGAM 模型的邻接矩阵生成图的 dot 格式
graph_dot = make_graph(model_lingam.adjacency_matrix_, labels = labels)

定义因果模型

displacement

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
# 定义因果模型,指定 displacement 作为治疗变量
treatment_variable = 'displacement' # 治疗变量:displacement
outcome_variable = 'weight' # 结果变量:weight

# 创建因果模型对象
model = CausalModel(
data = df_causally, # 输入数据
treatment = treatment_variable, # 定义治疗变量
outcome = outcome_variable, # 定义结果(因变量)
graph = str_to_dot(graph_dot.source) # 将图的 dot 格式转换为 CausalModel 所需的格式
)

# 识别因果效应(Identification)
identified_estimand = model.identify_effect(proceed_when_unidentifiable = True)# 识别因果效应的估计量
print(identified_estimand) # 打印识别的因果效应估计量

# 估计因果效应(Estimation)
estimate = model.estimate_effect(
identified_estimand, # 使用之前识别的因果效应估计量
method_name = "backdoor.linear_regression", # 使用backdoor方法(通过线性回归估计)
control_value = 0, # 控制变量
treatment_value = 1, # 处理变量 检查当治疗变量为 1 时,结果变量如何变化
confidence_intervals = True, # 计算置信区间
test_significance = True # 进行显著性检验
)

# 打印因果效应估计值
print("Causal Estimate is " + str(estimate.value))

# 使用随机共同原因方法来反驳因果估算
refute1_results = model.refute_estimate(
identified_estimand,
estimate,
method_name = "random_common_cause" # 通过引入随机共同原因来测试因果估算的稳定性
)

# 打印反驳结果,查看因果估算是否受到随机共同原因的影响
print(refute1_results)

# 使用安慰剂处理反驳器来反驳因果估算
refute2_results = model.refute_estimate(
identified_estimand,
estimate,
method_name = "placebo_treatment_refuter" # 随机将一个协变量作为处理,并重新估算
)

# 打印反驳结果,查看安慰剂处理是否导致因果估算接近 0
print(refute2_results)

# 使用数据子集反驳器来反驳因果估算
refute3_results = model.refute_estimate(
identified_estimand,
estimate,
method_name = "data_subset_refuter" # 创建数据子集并检查因果估算的变化
)

# 打印反驳结果,查看因果估算在不同子集中的变化情况
print(refute3_results)
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
Estimand type: nonparametric-ate

### Estimand : 1
Estimand name: backdoor
Estimand expression:
d
───────────────(E[weight|acceleration,horsepower])
d[displacement]
Estimand assumption 1, Unconfoundedness: If U→{displacement} and U→weight then P(weight|displacement,acceleration,horsepower,U) = P(weight|displacement,acceleration,horsepower)

### Estimand : 2
Estimand name: iv
No such variable(s) found!

### Estimand : 3
Estimand name: frontdoor
No such variable(s) found!

Causal Estimate is 5.782613864626683

Refute: Add a random common cause
Estimated effect:5.782613864626683
New effect:5.783766379819861
p value:0.92

Refute: Use a Placebo Treatment
Estimated effect:5.782613864626683
New effect:2.0463630789890885e-11
p value:0.0

Refute: Use a subset of data
Estimated effect:5.782613864626683
New effect:5.783483799091648
p value:0.9199999999999999

horsepower

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
treatment_variable = 'horsepower' 
outcome_variable = 'weight'

# 创建因果模型对象
model = CausalModel(
data = df_causally,
treatment = treatment_variable,
outcome = outcome_variable,
graph = str_to_dot(graph_dot.source)
)

# 识别因果效应
identified_estimand = model.identify_effect(proceed_when_unidentifiable = True)
print(identified_estimand)

# 估计因果效应
estimate = model.estimate_effect(
identified_estimand,
method_name = "backdoor.linear_regression",
control_value = 0,
treatment_value = 1,
confidence_intervals = True,
test_significance = True
)

# 打印因果效应估计值
print("Causal Estimate is " + str(estimate.value))

# 使用随机共同原因方法来反驳因果估算
refute1_results = model.refute_estimate(
identified_estimand,
estimate,
method_name = "random_common_cause"
)

# 打印反驳结果
print(refute1_results)

# 使用安慰剂处理反驳器来反驳因果估算
refute2_results = model.refute_estimate(
identified_estimand,
estimate,
method_name = "placebo_treatment_refuter"
)

# 打印反驳结果
print(refute2_results)

# 使用数据子集反驳器来反驳因果估算
refute3_results = model.refute_estimate(
identified_estimand,
estimate,
method_name = "data_subset_refuter"
)

# 打印反驳结果
print(refute3_results)
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
Estimand type: nonparametric-ate

### Estimand : 1
Estimand name: backdoor
Estimand expression:
d
─────────────(E[weight])
d[horsepower]
Estimand assumption 1, Unconfoundedness: If U→{horsepower} and U→weight then P(weight|horsepower,,U) = P(weight|horsepower,)

### Estimand : 2
Estimand name: iv
No such variable(s) found!

### Estimand : 3
Estimand name: frontdoor
No such variable(s) found!

Causal Estimate is 19.102332110133716

Refute: Add a random common cause
Estimated effect:19.102332110133716
New effect:19.10351101858844
p value:0.98

Refute: Use a Placebo Treatment
Estimated effect:19.102332110133716
New effect:0.0
p value:1.0

Refute: Use a subset of data
Estimated effect:19.102332110133716
New effect:19.094103444274655
p value:0.8999999999999999

acceleration

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
treatment_variable = 'acceleration' 
outcome_variable = 'weight'

# 创建因果模型对象
model = CausalModel(
data = df_causally,
treatment = treatment_variable,
outcome = outcome_variable,
graph = str_to_dot(graph_dot.source)
)

# 识别因果效应
identified_estimand = model.identify_effect(proceed_when_unidentifiable = True)
print(identified_estimand)

# 估计因果效应
estimate = model.estimate_effect(
identified_estimand,
method_name = "backdoor.linear_regression",
control_value = 0,
treatment_value = 1,
confidence_intervals = True,
test_significance = True
)

# 打印因果效应估计值
print("Causal Estimate is " + str(estimate.value))

# 使用随机共同原因方法来反驳因果估算
refute1_results = model.refute_estimate(
identified_estimand,
estimate,
method_name = "random_common_cause"
)

# 打印反驳结果
print(refute1_results)

# 使用安慰剂处理反驳器来反驳因果估算
refute2_results = model.refute_estimate(
identified_estimand,
estimate,
method_name = "placebo_treatment_refuter"
)

# 打印反驳结果
print(refute2_results)

# 使用数据子集反驳器来反驳因果估算
refute3_results = model.refute_estimate(
identified_estimand,
estimate,
method_name = "data_subset_refuter"
)

# 打印反驳结果
print(refute3_results)
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
Estimand type: nonparametric-ate

### Estimand : 1
Estimand name: backdoor
Estimand expression:
d
───────────────(E[weight|horsepower])
d[acceleration]
Estimand assumption 1, Unconfoundedness: If U→{acceleration} and U→weight then P(weight|acceleration,horsepower,U) = P(weight|acceleration,horsepower)

### Estimand : 2
Estimand name: iv
No such variable(s) found!

### Estimand : 3
Estimand name: frontdoor
No such variable(s) found!

Causal Estimate is 104.36913948971551

Refute: Add a random common cause
Estimated effect:104.36913948971551
New effect:104.3604588867247
p value:0.98

Refute: Use a Placebo Treatment
Estimated effect:104.36913948971551
New effect:0.0
p value:1.0

Refute: Use a subset of data
Estimated effect:104.36913948971551
New effect:104.43880782725017
p value:0.98

根据领域知识构建因果图

第二种方法是基于领域知识构建因果图。

手动定义了一个符合汽车工程学常识的因果图,其中horsepower作为核心动力源,直接或间接地影响其他变量和最终的重量。使用这个专家知识图,dowhy再次进行了因果效应估计。令人信服的是,基于这个图计算出的displacementweight的因果效应值(约为 5.86)与前一种数据驱动方法得到的结果高度一致,这两种不同路径得出的相似结论相互印证,使得最终的因果判断更加可靠和稳固。

总而言之,本示例演示了一个从现象到本质的完整数据科学探索之旅。它始于建立高精度的预测模型以捕捉变量间的相关性,通过 SHAP 解释模型决策以理解这些相关性如何被利用,最终通过两种互补的因果推断方法(数据驱动和知识驱动)并结合严格的反驳检验,量化了变量之间纯粹的因果效应。

这体现了现代数据分析的核心思想:因果推断并非一蹴而就的自动化过程,而是在数据证据和领域知识之间反复迭代、验证和修正的严谨科学过程,其最终目标是产出经得起推敲、具有现实指导意义的可靠结论。

构建因果图

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
# 定义因果图的 DOT 语言
causal_graph = """
digraph {
horsepower[label="Horsepower"];
displacement[label="Displacement"];
acceleration[label="Acceleration"];
weight[label="Weight"];

horsepower -> displacement;
horsepower -> acceleration;
displacement -> weight;
acceleration -> weight;
horsepower -> weight;
}
"""


# 创建 DoWhy 因果模型
model = dowhy.CausalModel(
data = df_causally, # 使用的数据集
graph = causal_graph.replace("\n", " "), # 将因果图中的换行符替换为空格,确保图形格式正确
treatment = "displacement", # 因果模型的处理变量(即干预变量)
outcome = 'weight' # 因果模型的结果变量(即输出变量)
)

# 可视化模型
model.view_model(file_name = 'C:/Users/Administrator/Desktop/causal_model')

# 显示生成的因果模型图
display(Image(filename = "C:/Users/Administrator/Desktop/causal_model.png"))

确定因果效应

1
2
3
4
5
# 确定因果效应
identified_estimand = model.identify_effect(proceed_when_unidentifiable=True)

# 打印已识别的因果效应估算
print(identified_estimand)
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
Estimand type: nonparametric-ate

### Estimand : 1
Estimand name: backdoor
Estimand expression:
d
───────────────(E[weight|horsepower])
d[displacement]
Estimand assumption 1, Unconfoundedness: If U→{displacement} and U→weight then P(weight|displacement,horsepower,U) = P(weight|displacement,horsepower)

### Estimand : 2
Estimand name: iv
No such variable(s) found!

### Estimand : 3
Estimand name: frontdoor
No such variable(s) found!

使用 backdoor 方法和倾向性评分加权法来估计因果效应

1
2
3
4
5
6
7
8
9
10
11
12
# 使用 backdoor 方法和倾向评分加权法来估算因果效应
estimate = model.estimate_effect(
identified_estimand,
method_name = "backdoor.linear_regression",
control_value = 0,
treatment_value = 1,
confidence_intervals = True,
test_significance = True
)

# 打印已识别的因果效应估算
print(estimate)
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
*** Causal Estimate ***

## Identified estimand
Estimand type: nonparametric-ate

### Estimand : 1
Estimand name: backdoor
Estimand expression:
d
───────────────(E[weight|horsepower])
d[displacement]
Estimand assumption 1, Unconfoundedness: If U→{displacement} and U→weight then P(weight|displacement,horsepower,U) = P(weight|displacement,horsepower)

## Realized estimand
b: weight~displacement+horsepower+displacement*acceleration
Target units: ate

## Estimate
Mean value: 5.8561263193757895
p-value: [0.048]
95.0% confidence interval: (np.float64(5.368861566346823), np.float64(6.40796995735559))

### Conditional Estimates
__categorical__acceleration
(7.999, 13.5] 4.331409
(13.5, 14.8] 5.287807
(14.8, 16.0] 5.814947
(16.0, 17.76] 6.432153
(17.76, 24.8] 7.626946
dtype: float64
1
2
3
# 打印因果效应估计值
print("Causal Estimate is " + str(estimate.value))
# Causal Estimate is 5.8561263193757895

后续同样和前面一样进行各种检验。

整体总结

在因果推断和因果图的构建过程中,是一个迭代的过程,通常需要在领域知识和数据分析结果之间不断地循环反馈,直到得到一个合理且可靠的因果图。这一过程的关键在于不断通过各种方法进行检验和调整。 所以不是单纯的更换数据集直接跑代码。

完整代码

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
266
267
268
269
270
271
272
273
274
275
276
277
278
279
280
281
282
283
284
285
286
287
288
289
290
291
292
293
294
295
296
297
298
299
300
301
302
303
304
305
306
307
308
309
310
311
312
313
314
315
316
317
318
319
320
321
322
323
324
325
326
327
328
329
330
331
332
333
334
335
336
337
338
339
340
341
342
343
344
345
346
347
348
349
350
351
352
353
354
355
356
357
358
359
360
361
362
363
364
365
366
367
368
369
370
371
372
373
374
375
376
377
378
379
380
381
382
383
384
385
386
387
388
389
390
391
392
393
394
395
396
397
398
399
400
401
402
403
404
405
406
407
408
409
410
411
412
413
414
415
416
417
418
419
420
421
422
423
424
425
426
427
428
429
430
431
432
433
434
435
436
437
438
439
440
441
442
443
444
445
446
447
448
449
450
451
452
453
# -*- coding: utf-8 -*-

import shap
import graphviz
import pandas as pd
import numpy as np
import matplotlib.pyplot as plt
import dowhy
from dowhy import CausalModel
from sklearn.impute import KNNImputer
from sklearn.model_selection import train_test_split
from lightgbm import LGBMRegressor
from sklearn.model_selection import GridSearchCV
from sklearn import metrics
from causallearn.search.FCMBased import lingam
from causallearn.search.FCMBased.lingam.utils import make_dot
from IPython.display import Image, display

plt.rcParams['font.family'] = 'Times New Roman'
plt.rcParams['axes.unicode_minus'] = False

import warnings
warnings.filterwarnings("ignore")

# 定义一个函数用于创建因果图的可视化
def make_graph(adjacency_matrix, labels = None):

# 将邻接矩阵中大于 0.01 的元素标记为 True(表示存在边),否则为 False
idx = np.abs(adjacency_matrix) > 0.01

# 获取邻接矩阵中存在边的所有索引位置(i.e., 行和列的索引)
dirs = np.where(idx)

# 使用 graphviz 库创建一个有向图对象,图形引擎设置为'dot'
d = graphviz.Digraph(engine = 'dot')

# 如果提供了标签,则使用标签,否则默认使用 'x0', 'x1', ... 作为节点标签
names = labels if labels else [f'x{i}' for i in range(len(adjacency_matrix))]

# 为每个节点添加一个图节点
for name in names:
d.node(name)

# 遍历邻接矩阵中存在边的节点对,并为每一条边添加可视化表示
for to, from_, coef in zip(dirs[0], dirs[1], adjacency_matrix[idx]):
# 使用边的起点和终点以及边的系数(权重)来添加边
d.edge(names[from_], names[to], label=str(coef))

# 返回创建的因果图对象
return d

# 定义一个函数将输入的 Graphviz 图的字符串转换为有效的 DOT 格式
def str_to_dot(string):
'''
Converts input string from graphviz library to valid DOT graph format.
'''
# 去除多余的空格和换行符,并将多行转换为一个有效的DOT格式(每行末尾加分号)
graph = string.strip().replace('\n', ';').replace('\t', '')

# 移除不必要的字符,从而确保 DOT 格式的正确性
graph = graph[:9] + graph[10:-2] + graph[-1] # 删除字符串中的多余字符

# 返回符合 DOT 格式的字符串
return graph

if __name__ == '__main__':

path = 'Z:/TData/big-data/sad41d8cd/251022_From_Prediction_to_Causal_Inference.xlsx'
df = pd.read_excel(path)

if False:
df.head()
df.columns

# 缺失值处理
if True:

print("缺失值:", df.isnull().sum().sum())

# 初始化 KNN 填补器,设置邻居数量为 5
knn_imputer = KNNImputer(n_neighbors = 5)
# 对数据进行填补
df = pd.DataFrame(knn_imputer.fit_transform(df), columns=df.columns)

print("填补后:", df.isnull().sum().sum())

# 训练集和测试集
if True:

# 划分特征和目标变量
X = df.drop(['weight'], axis = 1) # 从数据集中去掉目标变量 'y',得到特征变量 X
y = df['weight'] # 提取目标变量 y

# 划分训练集和测试集
X_train, X_test, y_train, y_test = train_test_split(
X, # 特征变量
y, # 目标变量
test_size = 0.3, # 测试集所占比例,这里为 30%
random_state = 42 # 随机种子,确保结果可重复
)

# K 折交叉验证
if True:

# 初始化 LightGBM 回归模型
model_lgbm = LGBMRegressor(random_state = 42, verbose = -1)

# 定义参数网格
param_grid_lgbm = {
'n_estimators': [50, 100, 200], # 树的数量
'learning_rate': [0.01, 0.1, 0.2], # 学习率
'max_depth': [-1, 10, 20], # 最大深度
'num_leaves': [31, 50, 100], # 叶节点数
'min_child_samples': [10, 20, 30] # 最小叶节点样本数
}

# 使用 GridSearchCV 进行网格搜索和 k 折交叉验证
grid_search_lgbm = GridSearchCV(
estimator = model_lgbm,
param_grid = param_grid_lgbm,
scoring = 'neg_mean_squared_error', # 评价指标为负均方误差
cv = 5, # 5 折交叉验证
n_jobs = -1, # 并行计算
verbose = 1 # 输出详细进度信息
)

# 训练模型
grid_search_lgbm.fit(X_train, y_train)

# 使用最优参数训练模型
best_model_lgbm = grid_search_lgbm.best_estimator_

# 构建模型
if True:

# 对训练集进行预测
y_pred_train = best_model_lgbm.predict(X_train)
y_pred_train_list = y_pred_train.tolist()

# 对测试集进行预测
y_pred_test = best_model_lgbm.predict(X_test)
y_pred_test_list = y_pred_test.tolist()

# 计算训练集的指标
mse_train = metrics.mean_squared_error(y_train, y_pred_train_list)
rmse_train = np.sqrt(mse_train)
mae_train = metrics.mean_absolute_error(y_train, y_pred_train_list)
r2_train = metrics.r2_score(y_train, y_pred_train_list)

# 计算测试集的指标
mse_test = metrics.mean_squared_error(y_test, y_pred_test_list)
rmse_test = np.sqrt(mse_test)
mae_test = metrics.mean_absolute_error(y_test, y_pred_test_list)
r2_test = metrics.r2_score(y_test, y_pred_test_list)

print("训练集评价指标:")
print("均方误差 (MSE):", mse_train)
print("均方根误差 (RMSE):", rmse_train)
print("平均绝对误差 (MAE):", mae_train)
print("拟合优度 (R-squared):", r2_train)

print("\n测试集评价指标:")
print("均方误差 (MSE):", mse_test)
print("均方根误差 (RMSE):", rmse_test)
print("平均绝对误差 (MAE):", mae_test)
print("拟合优度 (R-squared):", r2_test)

# 模型可解释性
if True:

explainer = shap.TreeExplainer(best_model_lgbm)
shap_values = explainer.shap_values(X_test)

plt.figure(figsize = (10, 5))
shap.summary_plot(shap_values, X_test, plot_type = "bar", show = False)
plt.tight_layout()
plt.savefig('C:/Users/Administrator/Desktop/barplot.png', format = 'png',bbox_inches = 'tight', dpi = 1200)
plt.show()

plt.figure(figsize = (10, 5))
shap.summary_plot(shap_values , X_test, feature_names = X_test.columns, plot_type = "dot", show = False)
plt.savefig('C:/Users/Administrator/Desktop/dotplot.png', format = 'png',bbox_inches = 'tight', dpi = 1200)
plt.show()

df_causally = df[['displacement', 'horsepower', 'acceleration', 'weight']]
df_causally

# 自动推断因果关系
if True:

# 使用数据的列名来生成标签
labels = [f'{col}' for i, col in enumerate(df_causally.columns)]

# 将数据转换为 numpy 数组
data = df_causally.to_numpy()

# 创建并初始化一个ICALiNGAM模型对象
model_lingam = lingam.ICALiNGAM()

# 使用数据拟合模型,学习因果关系
model_lingam.fit(data)

# 使用make_dot函数根据模型的邻接矩阵(adjacency matrix)生成因果图,labels为节点标签
dot_graph = make_dot(model_lingam.adjacency_matrix_, labels=labels)
dot_graph.render("C:/Users/Administrator/Desktop//lingam_causal_graph", format = "png")

if True:

np.set_printoptions(precision = 3, suppress = True)
np.random.seed(0)

# 使用 make_graph 函数根据 LiNGAM 模型的邻接矩阵生成图的 dot 格式
graph_dot = make_graph(model_lingam.adjacency_matrix_, labels = labels)


# 定义因果模型, 指定 displacement 作为治疗变量
if True:

# 定义因果模型,指定 displacement 作为治疗变量
treatment_variable = 'displacement' # 治疗变量:displacement
outcome_variable = 'weight' # 结果变量:weight

# 创建因果模型对象
model = CausalModel(
data = df_causally, # 输入数据
treatment = treatment_variable, # 定义治疗变量
outcome = outcome_variable, # 定义结果(因变量)
graph = str_to_dot(graph_dot.source) # 将图的 dot 格式转换为 CausalModel 所需的格式
)

# 识别因果效应(Identification)
identified_estimand = model.identify_effect(proceed_when_unidentifiable = True) # 识别因果效应的估计量
print(identified_estimand) # 打印识别的因果效应估计量

# 估计因果效应(Estimation)
estimate = model.estimate_effect(
identified_estimand, # 使用之前识别的因果效应估计量
method_name = "backdoor.linear_regression", # 使用backdoor方法(通过线性回归估计)
control_value = 0, # 控制变量
treatment_value = 1, # 处理变量 检查当治疗变量为 1 时,结果变量如何变化
confidence_intervals = True, # 计算置信区间
test_significance = True # 进行显著性检验
)

# 打印因果效应估计值
print("Causal Estimate is " + str(estimate.value))

# 使用随机共同原因方法来反驳因果估算
refute1_results = model.refute_estimate(
identified_estimand,
estimate,
method_name = "random_common_cause" # 通过引入随机共同原因来测试因果估算的稳定性
)

# 打印反驳结果,查看因果估算是否受到随机共同原因的影响
print(refute1_results)

# 使用安慰剂处理反驳器来反驳因果估算
refute2_results = model.refute_estimate(
identified_estimand,
estimate,
method_name = "placebo_treatment_refuter" # 随机将一个协变量作为处理,并重新估算
)

# 打印反驳结果,查看安慰剂处理是否导致因果估算接近 0
print(refute2_results)

# 使用数据子集反驳器来反驳因果估算
refute3_results = model.refute_estimate(
identified_estimand,
estimate,
method_name = "data_subset_refuter" # 创建数据子集并检查因果估算的变化
)

# 打印反驳结果,查看因果估算在不同子集中的变化情况
print(refute3_results)

# 定义因果模型, 指定 horsepower 作为治疗变量
if True:

treatment_variable = 'horsepower'
outcome_variable = 'weight'

# 创建因果模型对象
model = CausalModel(
data = df_causally,
treatment = treatment_variable,
outcome = outcome_variable,
graph = str_to_dot(graph_dot.source)
)

# 识别因果效应
identified_estimand = model.identify_effect(proceed_when_unidentifiable = True)
print(identified_estimand)

# 估计因果效应
estimate = model.estimate_effect(
identified_estimand,
method_name = "backdoor.linear_regression",
control_value = 0,
treatment_value = 1,
confidence_intervals = True,
test_significance = True
)

# 打印因果效应估计值
print("Causal Estimate is " + str(estimate.value))

# 使用随机共同原因方法来反驳因果估算
refute1_results = model.refute_estimate(
identified_estimand,
estimate,
method_name = "random_common_cause"
)

# 打印反驳结果
print(refute1_results)

# 使用安慰剂处理反驳器来反驳因果估算
refute2_results = model.refute_estimate(
identified_estimand,
estimate,
method_name = "placebo_treatment_refuter"
)

# 打印反驳结果
print(refute2_results)

# 使用数据子集反驳器来反驳因果估算
refute3_results = model.refute_estimate(
identified_estimand,
estimate,
method_name = "data_subset_refuter"
)

# 打印反驳结果
print(refute3_results)


# 定义因果模型, 指定 acceleration 作为治疗变量
if True:

treatment_variable = 'acceleration'
outcome_variable = 'weight'

# 创建因果模型对象
model = CausalModel(
data = df_causally,
treatment = treatment_variable,
outcome = outcome_variable,
graph = str_to_dot(graph_dot.source)
)

# 识别因果效应
identified_estimand = model.identify_effect(proceed_when_unidentifiable = True)
print(identified_estimand)

# 估计因果效应
estimate = model.estimate_effect(
identified_estimand,
method_name = "backdoor.linear_regression",
control_value = 0,
treatment_value = 1,
confidence_intervals = True,
test_significance = True
)

# 打印因果效应估计值
print("Causal Estimate is " + str(estimate.value))

# 使用随机共同原因方法来反驳因果估算
refute1_results = model.refute_estimate(
identified_estimand,
estimate,
method_name = "random_common_cause"
)

# 打印反驳结果
print(refute1_results)

# 使用安慰剂处理反驳器来反驳因果估算
refute2_results = model.refute_estimate(
identified_estimand,
estimate,
method_name = "placebo_treatment_refuter"
)

# 打印反驳结果
print(refute2_results)

# 使用数据子集反驳器来反驳因果估算
refute3_results = model.refute_estimate(
identified_estimand,
estimate,
method_name = "data_subset_refuter"
)

# 打印反驳结果
print(refute3_results)

# 根据领域知识构建因果图
if True:

# 定义因果图的 DOT 语言
causal_graph = """
digraph {
horsepower[label="Horsepower"];
displacement[label="Displacement"];
acceleration[label="Acceleration"];
weight[label="Weight"];

horsepower -> displacement;
horsepower -> acceleration;
displacement -> weight;
acceleration -> weight;
horsepower -> weight;
}
"""

# 创建 DoWhy 因果模型
model = dowhy.CausalModel(
data = df_causally, # 使用的数据集
graph = causal_graph.replace("\n", " "), # 将因果图中的换行符替换为空格,确保图形格式正确
treatment = "displacement", # 因果模型的处理变量(即干预变量)
outcome = 'weight' # 因果模型的结果变量(即输出变量)
)

# 可视化模型
model.view_model(file_name = 'C:/Users/Administrator/Desktop/causal_model')

# 显示生成的因果模型图
display(Image(filename = "C:/Users/Administrator/Desktop/causal_model.png"))

# 确定因果效应
identified_estimand = model.identify_effect(proceed_when_unidentifiable = True)

# 打印已识别的因果效应估算
print(identified_estimand)

# 使用 backdoor 方法和倾向评分加权法来估算因果效应
estimate = model.estimate_effect(
identified_estimand,
method_name = "backdoor.linear_regression",
control_value = 0,
treatment_value = 1,
confidence_intervals = True,
test_significance = True
)

print(estimate)

# 打印因果效应估计值
print("Causal Estimate is " + str(estimate.value))
  • Titre: 从机器学习到因果推断的汽车性能数据分析与关系估计案例实践
  • Auteur: Xing Abao
  • Créé à : 2025-10-22 12:39:13
  • Mis à jour à : 2025-10-27 11:25:24
  • Lien: https://bioinformatics.vip/2025/10/22/sad41d8cd/251022_From_Prediction_to_Causal_Inference/
  • Licence: Cette œuvre est sous licence CC BY-NC-SA 4.0.
Commentaires