机器学习完整流程指南:从数据到部署
机器学习完整流程指南
本教程将带你完成一个完整的机器学习项目,从数据获取到模型部署的全过程。
项目概述:房价预测
我们将使用波士顿房价数据集(或加州房价数据集)来预测房屋价格。
1. 环境设置
# 导入必要库
import numpy as np
import pandas as pd
import matplotlib.pyplot as plt
import seaborn as sns
from sklearn.model_selection import train_test_split, cross_val_score
from sklearn.preprocessing import StandardScaler, OneHotEncoder
from sklearn.compose import ColumnTransformer
from sklearn.pipeline import Pipeline
from sklearn.ensemble import RandomForestRegressor, GradientBoostingRegressor
from sklearn.linear_model import LinearRegression, Ridge, Lasso
from sklearn.metrics import mean_squared_error, r2_score, mean_absolute_error
import joblib
import warnings
warnings.filterwarnings('ignore')
# 设置中文显示
plt.rcParams['font.sans-serif'] = ['SimHei', 'DejaVu Sans']
plt.rcParams['axes.unicode_minus'] = False
2. 数据收集与探索
2.1 加载数据
# 使用加州房价数据集
from sklearn.datasets import fetch_california_housing
housing = fetch_california_housing()
X = pd.DataFrame(housing.data, columns=housing.feature_names)
y = pd.Series(housing.target, name='MedHouseVal')
print(f"数据集形状: {X.shape}")
print(f"特征名称: {X.columns.tolist()}")
2.2 数据探索
def explore_data(X, y):
"""数据探索分析"""
print("=== 数据概览 ===")
print(X.info())
print("\n=== 描述性统计 ===")
print(X.describe())
print("\n=== 缺失值检查 ===")
print(X.isnull().sum())
print("\n=== 目标变量分布 ===")
fig, axes = plt.subplots(1, 2, figsize=(12, 4))
# 目标变量分布
axes[0].hist(y, bins=50, edgecolor='black', alpha=0.7)
axes[0].set_xlabel('房屋价格')
axes[0].set_ylabel('频数')
axes[0].set_title('目标变量分布')
# 特征相关性热图
corr_matrix = X.corr()
sns.heatmap(corr_matrix, annot=True, fmt='.2f', cmap='coolwarm',
center=0, ax=axes[1])
axes[1].set_title('特征相关性热图')
plt.tight_layout()
plt.show()
return X
X = explore_data(X, y)
3. 数据预处理
3.1 处理异常值
def handle_outliers(df, columns, method='iqr', threshold=1.5):
"""处理异常值"""
df_clean = df.copy()
for col in columns:
if method == 'iqr':
Q1 = df[col].quantile(0.25)
Q3 = df[col].quantile(0.75)
IQR = Q3 - Q1
lower_bound = Q1 - threshold * IQR
upper_bound = Q3 + threshold * IQR
# 将异常值替换为边界值
df_clean[col] = df[col].clip(lower_bound, upper_bound)
return df_clean
# 处理数值特征的异常值
numeric_features = X.columns.tolist()
X_clean = handle_outliers(X, numeric_features)
3.2 特征工程
def create_features(df):
"""创建新特征"""
df_new = df.copy()
# 创建房间总数特征
df_new['TotalRooms'] = df['AveRooms'] * df['HouseAge']
# 创建房间与人口比例
df_new['RoomsPerHousehold'] = df['AveRooms'] / (df['Population'] / df['Households'] + 1)
# 创建收入与房间比例
df_new['IncomePerRoom'] = df['MedInc'] / (df['AveRooms'] + 1)
return df_new
X_engineered = create_features(X_clean)
3.3 数据分割与标准化
# 分割数据集
X_train, X_test, y_train, y_test = train_test_split(
X_engineered, y, test_size=0.2, random_state=42
)
print(f"训练集大小: {X_train.shape}")
print(f"测试集大小: {X_test.shape}")
# 特征标准化
scaler = StandardScaler()
X_train_scaled = scaler.fit_transform(X_train)
X_test_scaled = scaler.transform(X_test)
# 转换为DataFrame
X_train_scaled = pd.DataFrame(X_train_scaled, columns=X_train.columns)
X_test_scaled = pd.DataFrame(X_test_scaled, columns=X_test.columns)
4. 模型训练与评估
4.1 定义多个模型进行比较
models = {
'Linear Regression': LinearRegression(),
'Ridge Regression': Ridge(alpha=1.0),
'Lasso Regression': Lasso(alpha=0.01),
'Random Forest': RandomForestRegressor(n_estimators=100, random_state=42),
'Gradient Boosting': GradientBoostingRegressor(n_estimators=100, random_state=42)
}
def evaluate_models(models, X_train, X_test, y_train, y_test):
"""评估多个模型"""
results = {}
for name, model in models.items():
# 训练模型
model.fit(X_train, y_train)
# 预测
y_pred_train = model.predict(X_train)
y_pred_test = model.predict(X_test)
# 计算指标
train_mse = mean_squared_error(y_train, y_pred_train)
test_mse = mean_squared_error(y_test, y_pred_test)
train_r2 = r2_score(y_train, y_pred_train)
test_r2 = r2_score(y_test, y_pred_test)
# 交叉验证
cv_scores = cross_val_score(model, X_train, y_train,
cv=5, scoring='r2')
results[name] = {
'model': model,
'train_mse': train_mse,
'test_mse': test_mse,
'train_r2': train_r2,
'test_r2': test_r2,
'cv_mean': cv_scores.mean(),
'cv_std': cv_scores.std()
}
print(f"\n{name}:")
print(f" 训练集 R²: {train_r2:.4f}")
print(f" 测试集 R²: {test_r2:.4f}")
print(f" 交叉验证 R²: {cv_scores.mean():.4f} (±{cv_scores.std():.4f})")
return results
# 评估模型
results = evaluate_models(models, X_train_scaled, X_test_scaled, y_train, y_test)
4.2 可视化模型比较
def visualize_results(results):
"""可视化模型比较结果"""
fig, axes = plt.subplots(2, 2, figsize=(14, 10))
# 1. 测试集R²比较
model_names = list(results.keys())
test_r2_values = [results[name]['test_r2'] for name in model_names]
axes[0, 0].barh(model_names, test_r2_values, color='skyblue')
axes[0, 0].set_xlabel('测试集 R²')
axes[0, 0].set_title('模型性能比较(测试集R²)')
axes[0, 0].axvline(x=0, color='gray', linestyle='--', alpha=0.5)
# 2. 训练集与测试集R²对比
train_r2_values = [results[name]['train_r2'] for name in model_names]
x = np.arange(len(model_names))
width = 0.35
axes[0, 1].bar(x - width/2, train_r2_values, width, label='训练集', color='lightblue')
axes[0, 1].bar(x + width/2, test_r2_values, width, label='测试集', color='lightcoral')
axes[0, 1].set_xlabel('模型')
axes[0, 1].set_ylabel('R²')
axes[0, 1].set_title('训练集 vs 测试集 R²')
axes[0, 1].set_xticks(x)
axes[0, 1].set_xticklabels(model_names, rotation=45)
axes[0, 1].legend()
# 3. 特征重要性(以随机森林为例)
best_model_name = max(results.keys(), key=lambda x: results[x]['test_r2'])
best_model = results[best_model_name]['model']
if hasattr(best_model, 'feature_importances_'):
importances = best_model.feature_importances_
indices = np.argsort(importances)[::-1]
axes[1, 0].bar(range(X_train.shape[1]), importances[indices])
axes[1, 0].set_xlabel('特征排名')
axes[1, 0].set_ylabel('重要性')
axes[1, 0].set_title(f'{best_model_name} 特征重要性')
axes[1, 0].set_xticks(range(X_train.shape[1]))
axes[1, 0].set_xticklabels(X_train.columns[indices], rotation=90)
# 4. 残差图
best_model_name = max(results.keys(), key=lambda x: results[x]['test_r2'])
best_model = results[best_model_name]['model']
y_pred = best_model.predict(X_test_scaled)
residuals = y_test - y_pred
axes[1, 1].scatter(y_pred, residuals, alpha=0.5)
axes[1, 1].axhline(y=0, color='red', linestyle='--')
axes[1, 1].set_xlabel('预测值')
axes[1, 1].set_ylabel('残差')
axes[1, 1].set_title(f'{best_model_name} 残差图')
plt.tight_layout()
plt.show()
visualize_results(results)
5. 模型优化
5.1 超参数调优
from sklearn.model_selection import GridSearchCV
def tune_random_forest(X_train, y_train):
"""随机森林超参数调优"""
param_grid = {
'n_estimators': [100, 200, 300],
'max_depth': [10, 20, 30, None],
'min_samples_split': [2, 5, 10],
'min_samples_leaf': [1, 2, 4],
'max_features': ['sqrt', 'log2']
}
rf = RandomForestRegressor(random_state=42)
grid_search = GridSearchCV(
rf, param_grid, cv=5, scoring='r2',
n_jobs=-1, verbose=1
)
grid_search.fit(X_train, y_train)
print(f"最佳参数: {grid_search.best_params_}")
print(f"最佳交叉验证得分: {grid_search.best_score_:.4f}")
return grid_search.best_estimator_
# 选择最佳模型进行调优
best_model_name = max(results.keys(), key=lambda x: results[x]['test_r2'])
print(f"\n对 {best_model_name} 进行超参数调优...")
if best_model_name == 'Random Forest':
optimized_model = tune_random_forest(X_train_scaled, y_train)
6. 模型部署
6.1 保存模型
def save_model_pipeline(model, scaler, feature_names, model_name='best_model'):
"""保存模型和预处理管道"""
import joblib
import datetime
# 创建模型信息字典
model_info = {
'model': model,
'scaler': scaler,
'feature_names': feature_names,
'created_at': datetime.datetime.now().strftime("%Y-%m-%d %H:%M:%S"),
'version': '1.0'
}
# 保存模型
joblib.dump(model_info, f'{model_name}.pkl')
print(f"模型已保存为 {model_name}.pkl")
# 保存模型元数据
metadata = {
'model_type': type(model).__name__,
'features': feature_names,
'created_at': model_info['created_at'],
'performance': results[best_model_name] if best_model_name in results else {}
}
import json
with open(f'{model_name}_metadata.json', 'w') as f:
json.dump(metadata, f, indent=2)
return model_info
# 选择最佳模型保存
best_model_name = max(results.keys(), key=lambda x: results[x]['test_r2'])
best_model = results[best_model_name]['model']
model_info = save_model_pipeline(
best_model, scaler, X_train.columns.tolist(),
model_name='housing_price_predictor'
)
6.2 创建预测API示例
def create_prediction_api_example():
"""创建简单的预测函数示例"""
example_code = '''
import joblib
import numpy as np
import pandas as pd
class HousingPricePredictor:
def __init__(self, model_path='housing_price_predictor.pkl'):
"""加载模型"""
self.model_info = joblib.load(model_path)
self.model = self.model_info['model']
self.scaler = self.model_info['scaler']
self.feature_names = self.model_info['feature_names']
def preprocess_input(self, input_data):
"""预处理输入数据"""
# 确保输入为DataFrame
if isinstance(input_data, dict):
df = pd.DataFrame([input_data])
elif isinstance(input_data, pd.DataFrame):
df = input_data.copy()
else:
raise ValueError("输入应为字典或DataFrame")
# 确保列顺序正确
df = df[self.feature_names]
# 标准化
df_scaled = self.scaler.transform(df)
return df_scaled
def predict(self, input_data):
"""预测房价"""
processed_data = self.preprocess_input(input_data)
prediction = self.model.predict(processed_data)
return prediction[0]
def batch_predict(self, input_data_list):
"""批量预测"""
predictions = []
for data in input_data_list:
pred = self.predict(data)
predictions.append(pred)
return predictions
# 使用示例
if __name__ == "__main__":
# 创建预测器
predictor = HousingPricePredictor()
# 示例输入
example_input = {
'MedInc': 8.3252,
'HouseAge': 41.0,
'AveRooms': 6.984127,
'AveBedrms': 1.023810,
'Population': 322.0,
'AveOccup': 2.555556,
'Latitude': 37.88,
'Longitude': -122.23
}
# 预测
predicted_price = predictor.predict(example_input)
print(f"预测房价: ${predicted_price * 100000:,.2f}")
'''
with open('predictor_api.py', 'w') as f:
f.write(example_code)
print("API示例代码已保存为 predictor_api.py")
create_prediction_api_example()