import pandas as pd import numpy as np import lightgbm as lgb from sklearn.model_selection import train_test_split from sklearn.metrics import accuracy_score, confusion_matrix import os import time import datetime import joblib import logging # 配置日志记录 logging.basicConfig(level=logging.INFO, format='%(asctime)s - %(levelname)s - %(message)s') logger = logging.getLogger(__name__) # --- 此训练逻辑相关的目录常量 --- MODEL_DIR = 'model' PREDICTIONS_DIR = os.path.join(MODEL_DIR, 'predictions') FEATURE_IMPORTANCE_DIR = 'feature-importance' # LightGBM早停回调函数 def early_stopping(stopping_rounds, verbose=True): return lgb.early_stopping(stopping_rounds=stopping_rounds, verbose=verbose) # 定义全局常量 # TIMESTAMP = datetime.datetime.now().strftime("%Y%m%d_%H%M%S") # 确保所有需要的目录都存在 os.makedirs(MODEL_DIR, exist_ok=True) os.makedirs(PREDICTIONS_DIR, exist_ok=True) os.makedirs(FEATURE_IMPORTANCE_DIR, exist_ok=True) def train_and_evaluate_model(dataset: pd.DataFrame, model_name: str): """ 使用LightGBM训练模型,并进行详细的评估、聚合和结果筛选。 Args: dataset (pd.DataFrame): 包含特征和 'label' 列的完整数据集。 model_name (str): 基础模型名,用于生成带时间戳的各类文件名。 Returns: tuple[float, str] | tuple[None, None]: - 如果成功,返回 (测试集准确率, 模型文件路径)。 - 如果失败,返回 (None, None)。 """ """ 使用LightGBM训练模型并评估。 """ # 在函数内部生成时间戳,保证每次调用都有新的时间戳 TIMESTAMP = datetime.datetime.now().strftime("%Y%m%d_%H%M%S") t1 = time.time() if 'label' not in dataset.columns: raise ValueError("数据集必须包含 'label' 列作为目标变量。") Y = dataset['label'] exclude_features = ['uuid', 'flow_uid', 'src_ip', 'dst_ip', 'server_name_indicator', 'src_port', 'dst_port'] numeric_cols = dataset.select_dtypes(include=np.number).columns.tolist() training_features = [col for col in numeric_cols if col not in exclude_features and col != 'label'] X = dataset[training_features] identifier_columns = ['src_ip', 'dst_ip', 'protocol', 'src_port', 'dst_port'] logger.info(f"特征数据集形状 (用于训练): {X.shape}") X_train, X_temp, Y_train, Y_temp = train_test_split(X, Y, test_size=0.1, random_state=2019, stratify=Y) X_val, X_test, Y_val, Y_test = train_test_split(X_temp, Y_temp, test_size=0.2, random_state=2019, stratify=Y_temp) original_test_data = dataset.loc[Y_test.index].copy() logger.info(f'训练集大小: {len(X_train)}, 验证集大小: {len(X_val)}, 测试集大小: {len(X_test)}') logger.info('开始训练LGB模型...') num_class = len(Y.unique()) logger.info(f"检测到 {num_class} 个类别。") # --- 模型优化:添加 class_weight 并增加 n_estimators --- clf = lgb.LGBMClassifier( objective='multiclass', num_class=num_class, # 学习 & 复杂度 learning_rate=0.05, n_estimators=4000, num_leaves=127, max_depth=-1, # 分裂/叶子 min_data_in_leaf=20, min_sum_hessian_in_leaf=1e-3, min_split_gain=0.0, # ✅ 正确名字 # 采样(避免别名冲突,只用这一组) colsample_bytree=0.9, # 不再给 feature_fraction subsample=0.9, # 不再给 bagging_fraction subsample_freq=1, # 其它 max_bin=511, class_weight='balanced', random_state=2019, n_jobs=-1, verbosity=-1 ) # --- 模型优化:增加早停的耐心 --- clf.fit( X_train, Y_train, eval_set=[(X_val, Y_val)], callbacks=[early_stopping(stopping_rounds=50)] # 增加早停轮次 (从10增加到50) ) model_save_path = os.path.join(MODEL_DIR, f'webshell_lgbm_classifier.lgb.{TIMESTAMP}.joblib') joblib.dump(clf, model_save_path) logger.info(f'模型 (LGBMClassifier) 已保存到: {model_save_path}') logger.info(f'训练数据耗时: {time.time() - t1:.2f} 秒') t2 = time.time() y_pred = clf.predict(X_test, num_iteration=clf.best_iteration_) logger.info(f'测试数据预测与处理耗时: {time.time() - t2:.2f} 秒') logger.info("--- 模型性能评估 ---") accuracy = accuracy_score(Y_test, y_pred) logger.info(f"准确率 (Accuracy): {accuracy:.4f}") # logger.info(f"混淆矩阵:\n{confusion_matrix(Y_test, y_pred)}") return accuracy, model_save_path