Page QiView

机器学习完整流程指南:从数据到部署

机器学习完整流程指南:从数据到部署

机器学习完整流程指南

本教程将带你完成一个完整的机器学习项目,从数据获取到模型部署的全过程。

项目概述:房价预测

我们将使用波士顿房价数据集(或加州房价数据集)来预测房屋价格。

1. 环境设置

# 导入必要库
import numpy as np
import pandas as pd
import matplotlib.pyplot as plt
import seaborn as sns
from sklearn.model_selection import train_test_split, cross_val_score
from sklearn.preprocessing import StandardScaler, OneHotEncoder
from sklearn.compose import ColumnTransformer
from sklearn.pipeline import Pipeline
from sklearn.ensemble import RandomForestRegressor, GradientBoostingRegressor
from sklearn.linear_model import LinearRegression, Ridge, Lasso
from sklearn.metrics import mean_squared_error, r2_score, mean_absolute_error
import joblib
import warnings
warnings.filterwarnings('ignore')

# 设置中文显示
plt.rcParams['font.sans-serif'] = ['SimHei', 'DejaVu Sans']
plt.rcParams['axes.unicode_minus'] = False

2. 数据收集与探索

2.1 加载数据

# 使用加州房价数据集
from sklearn.datasets import fetch_california_housing

housing = fetch_california_housing()
X = pd.DataFrame(housing.data, columns=housing.feature_names)
y = pd.Series(housing.target, name='MedHouseVal')

print(f"数据集形状: {X.shape}")
print(f"特征名称: {X.columns.tolist()}")

2.2 数据探索

def explore_data(X, y):
    """数据探索分析"""
    print("=== 数据概览 ===")
    print(X.info())
    print("\n=== 描述性统计 ===")
    print(X.describe())
    
    print("\n=== 缺失值检查 ===")
    print(X.isnull().sum())
    
    print("\n=== 目标变量分布 ===")
    fig, axes = plt.subplots(1, 2, figsize=(12, 4))
    
    # 目标变量分布
    axes[0].hist(y, bins=50, edgecolor='black', alpha=0.7)
    axes[0].set_xlabel('房屋价格')
    axes[0].set_ylabel('频数')
    axes[0].set_title('目标变量分布')
    
    # 特征相关性热图
    corr_matrix = X.corr()
    sns.heatmap(corr_matrix, annot=True, fmt='.2f', cmap='coolwarm', 
                center=0, ax=axes[1])
    axes[1].set_title('特征相关性热图')
    
    plt.tight_layout()
    plt.show()
    
    return X

X = explore_data(X, y)

3. 数据预处理

3.1 处理异常值

def handle_outliers(df, columns, method='iqr', threshold=1.5):
    """处理异常值"""
    df_clean = df.copy()
    
    for col in columns:
        if method == 'iqr':
            Q1 = df[col].quantile(0.25)
            Q3 = df[col].quantile(0.75)
            IQR = Q3 - Q1
            lower_bound = Q1 - threshold * IQR
            upper_bound = Q3 + threshold * IQR
            
            # 将异常值替换为边界值
            df_clean[col] = df[col].clip(lower_bound, upper_bound)
    
    return df_clean

# 处理数值特征的异常值
numeric_features = X.columns.tolist()
X_clean = handle_outliers(X, numeric_features)

3.2 特征工程

def create_features(df):
    """创建新特征"""
    df_new = df.copy()
    
    # 创建房间总数特征
    df_new['TotalRooms'] = df['AveRooms'] * df['HouseAge']
    
    # 创建房间与人口比例
    df_new['RoomsPerHousehold'] = df['AveRooms'] / (df['Population'] / df['Households'] + 1)
    
    # 创建收入与房间比例
    df_new['IncomePerRoom'] = df['MedInc'] / (df['AveRooms'] + 1)
    
    return df_new

X_engineered = create_features(X_clean)

3.3 数据分割与标准化

# 分割数据集
X_train, X_test, y_train, y_test = train_test_split(
    X_engineered, y, test_size=0.2, random_state=42
)

print(f"训练集大小: {X_train.shape}")
print(f"测试集大小: {X_test.shape}")

# 特征标准化
scaler = StandardScaler()
X_train_scaled = scaler.fit_transform(X_train)
X_test_scaled = scaler.transform(X_test)

# 转换为DataFrame
X_train_scaled = pd.DataFrame(X_train_scaled, columns=X_train.columns)
X_test_scaled = pd.DataFrame(X_test_scaled, columns=X_test.columns)

4. 模型训练与评估

4.1 定义多个模型进行比较

models = {
    'Linear Regression': LinearRegression(),
    'Ridge Regression': Ridge(alpha=1.0),
    'Lasso Regression': Lasso(alpha=0.01),
    'Random Forest': RandomForestRegressor(n_estimators=100, random_state=42),
    'Gradient Boosting': GradientBoostingRegressor(n_estimators=100, random_state=42)
}

def evaluate_models(models, X_train, X_test, y_train, y_test):
    """评估多个模型"""
    results = {}
    
    for name, model in models.items():
        # 训练模型
        model.fit(X_train, y_train)
        
        # 预测
        y_pred_train = model.predict(X_train)
        y_pred_test = model.predict(X_test)
        
        # 计算指标
        train_mse = mean_squared_error(y_train, y_pred_train)
        test_mse = mean_squared_error(y_test, y_pred_test)
        train_r2 = r2_score(y_train, y_pred_train)
        test_r2 = r2_score(y_test, y_pred_test)
        
        # 交叉验证
        cv_scores = cross_val_score(model, X_train, y_train, 
                                   cv=5, scoring='r2')
        
        results[name] = {
            'model': model,
            'train_mse': train_mse,
            'test_mse': test_mse,
            'train_r2': train_r2,
            'test_r2': test_r2,
            'cv_mean': cv_scores.mean(),
            'cv_std': cv_scores.std()
        }
        
        print(f"\n{name}:")
        print(f"  训练集 R²: {train_r2:.4f}")
        print(f"  测试集 R²: {test_r2:.4f}")
        print(f"  交叉验证 R²: {cv_scores.mean():.4f}{cv_scores.std():.4f})")
    
    return results

# 评估模型
results = evaluate_models(models, X_train_scaled, X_test_scaled, y_train, y_test)

4.2 可视化模型比较

def visualize_results(results):
    """可视化模型比较结果"""
    fig, axes = plt.subplots(2, 2, figsize=(14, 10))
    
    # 1. 测试集R²比较
    model_names = list(results.keys())
    test_r2_values = [results[name]['test_r2'] for name in model_names]
    
    axes[0, 0].barh(model_names, test_r2_values, color='skyblue')
    axes[0, 0].set_xlabel('测试集 R²')
    axes[0, 0].set_title('模型性能比较(测试集R²)')
    axes[0, 0].axvline(x=0, color='gray', linestyle='--', alpha=0.5)
    
    # 2. 训练集与测试集R²对比
    train_r2_values = [results[name]['train_r2'] for name in model_names]
    
    x = np.arange(len(model_names))
    width = 0.35
    
    axes[0, 1].bar(x - width/2, train_r2_values, width, label='训练集', color='lightblue')
    axes[0, 1].bar(x + width/2, test_r2_values, width, label='测试集', color='lightcoral')
    axes[0, 1].set_xlabel('模型')
    axes[0, 1].set_ylabel('R²')
    axes[0, 1].set_title('训练集 vs 测试集 R²')
    axes[0, 1].set_xticks(x)
    axes[0, 1].set_xticklabels(model_names, rotation=45)
    axes[0, 1].legend()
    
    # 3. 特征重要性(以随机森林为例)
    best_model_name = max(results.keys(), key=lambda x: results[x]['test_r2'])
    best_model = results[best_model_name]['model']
    
    if hasattr(best_model, 'feature_importances_'):
        importances = best_model.feature_importances_
        indices = np.argsort(importances)[::-1]
        
        axes[1, 0].bar(range(X_train.shape[1]), importances[indices])
        axes[1, 0].set_xlabel('特征排名')
        axes[1, 0].set_ylabel('重要性')
        axes[1, 0].set_title(f'{best_model_name} 特征重要性')
        axes[1, 0].set_xticks(range(X_train.shape[1]))
        axes[1, 0].set_xticklabels(X_train.columns[indices], rotation=90)
    
    # 4. 残差图
    best_model_name = max(results.keys(), key=lambda x: results[x]['test_r2'])
    best_model = results[best_model_name]['model']
    y_pred = best_model.predict(X_test_scaled)
    residuals = y_test - y_pred
    
    axes[1, 1].scatter(y_pred, residuals, alpha=0.5)
    axes[1, 1].axhline(y=0, color='red', linestyle='--')
    axes[1, 1].set_xlabel('预测值')
    axes[1, 1].set_ylabel('残差')
    axes[1, 1].set_title(f'{best_model_name} 残差图')
    
    plt.tight_layout()
    plt.show()

visualize_results(results)

5. 模型优化

5.1 超参数调优

from sklearn.model_selection import GridSearchCV

def tune_random_forest(X_train, y_train):
    """随机森林超参数调优"""
    param_grid = {
        'n_estimators': [100, 200, 300],
        'max_depth': [10, 20, 30, None],
        'min_samples_split': [2, 5, 10],
        'min_samples_leaf': [1, 2, 4],
        'max_features': ['sqrt', 'log2']
    }
    
    rf = RandomForestRegressor(random_state=42)
    grid_search = GridSearchCV(
        rf, param_grid, cv=5, scoring='r2',
        n_jobs=-1, verbose=1
    )
    
    grid_search.fit(X_train, y_train)
    
    print(f"最佳参数: {grid_search.best_params_}")
    print(f"最佳交叉验证得分: {grid_search.best_score_:.4f}")
    
    return grid_search.best_estimator_

# 选择最佳模型进行调优
best_model_name = max(results.keys(), key=lambda x: results[x]['test_r2'])
print(f"\n{best_model_name} 进行超参数调优...")

if best_model_name == 'Random Forest':
    optimized_model = tune_random_forest(X_train_scaled, y_train)

6. 模型部署

6.1 保存模型

def save_model_pipeline(model, scaler, feature_names, model_name='best_model'):
    """保存模型和预处理管道"""
    import joblib
    import datetime
    
    # 创建模型信息字典
    model_info = {
        'model': model,
        'scaler': scaler,
        'feature_names': feature_names,
        'created_at': datetime.datetime.now().strftime("%Y-%m-%d %H:%M:%S"),
        'version': '1.0'
    }
    
    # 保存模型
    joblib.dump(model_info, f'{model_name}.pkl')
    print(f"模型已保存为 {model_name}.pkl")
    
    # 保存模型元数据
    metadata = {
        'model_type': type(model).__name__,
        'features': feature_names,
        'created_at': model_info['created_at'],
        'performance': results[best_model_name] if best_model_name in results else {}
    }
    
    import json
    with open(f'{model_name}_metadata.json', 'w') as f:
        json.dump(metadata, f, indent=2)
    
    return model_info

# 选择最佳模型保存
best_model_name = max(results.keys(), key=lambda x: results[x]['test_r2'])
best_model = results[best_model_name]['model']

model_info = save_model_pipeline(
    best_model, scaler, X_train.columns.tolist(), 
    model_name='housing_price_predictor'
)

6.2 创建预测API示例

def create_prediction_api_example():
    """创建简单的预测函数示例"""
    example_code = '''
import joblib
import numpy as np
import pandas as pd

class HousingPricePredictor:
    def __init__(self, model_path='housing_price_predictor.pkl'):
        """加载模型"""
        self.model_info = joblib.load(model_path)
        self.model = self.model_info['model']
        self.scaler = self.model_info['scaler']
        self.feature_names = self.model_info['feature_names']
        
    def preprocess_input(self, input_data):
        """预处理输入数据"""
        # 确保输入为DataFrame
        if isinstance(input_data, dict):
            df = pd.DataFrame([input_data])
        elif isinstance(input_data, pd.DataFrame):
            df = input_data.copy()
        else:
            raise ValueError("输入应为字典或DataFrame")
        
        # 确保列顺序正确
        df = df[self.feature_names]
        
        # 标准化
        df_scaled = self.scaler.transform(df)
        
        return df_scaled
    
    def predict(self, input_data):
        """预测房价"""
        processed_data = self.preprocess_input(input_data)
        prediction = self.model.predict(processed_data)
        return prediction[0]
    
    def batch_predict(self, input_data_list):
        """批量预测"""
        predictions = []
        for data in input_data_list:
            pred = self.predict(data)
            predictions.append(pred)
        return predictions

# 使用示例
if __name__ == "__main__":
    # 创建预测器
    predictor = HousingPricePredictor()
    
    # 示例输入
    example_input = {
        'MedInc': 8.3252,
        'HouseAge': 41.0,
        'AveRooms': 6.984127,
        'AveBedrms': 1.023810,
        'Population': 322.0,
        'AveOccup': 2.555556,
        'Latitude': 37.88,
        'Longitude': -122.23
    }
    
    # 预测
    predicted_price = predictor.predict(example_input)
    print(f"预测房价: ${predicted_price * 100000:,.2f}")
'''
    
    with open('predictor_api.py', 'w') as f:
        f.write(example_code)
    
    print("API示例代码已保存为 predictor_api.py")

create_prediction_api_example()