#include #include "KafkaConsumer.h" #include "mongoose.h" #include "minio_download.h" #include "AiModule.h" #include "curlget.h" #include "cJSON.h" #include "thread_local_logger.h" //是否触发训练阈值。 int max_alert ; //kafka配置 char *brokersbytes; char *topic_cnn_bytes; char *topic_gbm_bytes; char *group_cnn; char *group_gbm; std::string produce_topic_cnn_str;// 生产的主题 std::string produce_topic_gbm_str; //服务配置 char *urlbytes; char *mngurl; //cnn 参数 char *model_name_cnn; char *model_path_cnn; //char *jsonl_file_path_cnn; //gbm 参数 char *model_name_gbm; char *model_path_gbm; //char *jsonl_file_path_gbm; //cnn update 参数 char *model_key; //char *white_data; char *black_base_path; int max_samples; //gbm update 参数 char *model_name; //char *jsonl_file_path; char *jsonl_content_string; int threshold; //minio 参数 char *endpoint; char *access_key; char *secret_key; //http消息处理 //前端发送的预测及训练参数,格式如下: /* { "model_name_cnn":"classification_cs_20250714.h5", "model_path_cnn":"./", "model_name_gbm":"webshell_lgbm_classifier.lgb.20250902_171938.joblib", "":"./"model_path_gbm, "max_alert":1000, "model_name":"lgb_webshell", "jsonl_file_path":"white_features.jsonl", "jsonl_content_string":"", "threshold":1000, "model_key":"cnn_webshell", "white_data":"white samples.jsonl", "black_base_path":"", "max_samples":1000 } */ void setGlobalCfg(); static void fn(struct mg_connection* c, int ev, void* ev_data, void* fn_data) { //MG_EV_HTTP_MSG表示为http请求 if (ev == MG_EV_HTTP_MSG) { struct mg_http_message* hm = (struct mg_http_message*)ev_data; //get请求示例,显示Hello Mongoose页面 if (mg_match(hm->uri, mg_str("/hello"), NULL)) { mg_http_reply(c, 200, "", "mongoose demo

Hello Mongoose!

\n"); } //post请求示例,请求体json示例{"isTrain":1} else if (mg_match(hm->uri, mg_str("/api/setParams"), NULL)) { //cnn预测 model_name_cnn = mg_json_get_str(hm->body,"$.model_name_cnn"); model_path_cnn = mg_json_get_str(hm->body,"$.model_path_cnn"); //gbm预测 //model_name_gbm = mg_json_get_str(hm->body,"$.model_name_gbm"); //model_path_gbm = mg_json_get_str(hm->body,"$.model_path_gbm"); //cnn训练 model_key = mg_json_get_str(hm->body,"$.model_key"); black_base_path = mg_json_get_str(hm->body,"$.black_base_path"); double a = 0; mg_json_get_num(hm->body, "$.max_samples", &a); max_samples = (int)a; //gbm训练 model_name = mg_json_get_str(hm->body,"$.model_name"); jsonl_content_string = mg_json_get_str(hm->body,"$.jsonl_content_string"); mg_json_get_num(hm->body, "$.threshold", &a); threshold = (int)a; //阀值 mg_json_get_num(hm->body, "$.max_alert", &a); max_alert = (int)a; //保存到配置文件 setGlobalCfg(); mg_http_reply(c, 200, "Content-Type: application/json\r\n", "{\"result\":%d}", 0); } //显示当前目录文件列表 else { struct mg_http_serve_opts opts = { 0 }; opts.root_dir = "."; mg_http_serve_dir(c, hm, &opts); } } } void* tfSetParam(void *arg) { //创建事件管理器 struct mg_mgr mgr; //设置日志级别为DEBUG,如果不做设置,默认情况下运行程序控制台不打印任何请求响应消息 mg_log_set(MG_LL_DEBUG); //初始化事件管理器 mg_mgr_init(&mgr); //创建http监听 mg_http_listen(&mgr, arg, fn, NULL); //开启事件循环 for (;;) { // 1000ms为超时时长 mg_mgr_poll(&mgr, 1000); } mg_mgr_free(&mgr); return 0; } // 加载并解析 INI 配置文件 void loadGlobalCfg(){ LOG_INFO("读取本地配置。"); dictionary *ini = iniparser_load("config_cnn.ini"); if (ini == NULL) { LOG_ERROR("无法加载 config_cnn.ini!"); return 0; } // 读取kafka配置 brokersbytes = iniparser_getstring(ini, "kafka:brokers", "111.32.12.11:9092"); topic_cnn_bytes = iniparser_getstring(ini, "kafka:topic_cnn", "analyzed_queue_fj"); topic_gbm_bytes = iniparser_getstring(ini, "kafka:topic_gbm", "analyzed_queue_fj"); group_cnn = iniparser_getstring(ini, "kafka:group_cnn", "cnnGroup"); group_gbm = iniparser_getstring(ini, "kafka:group_gbm", "gbmGroup"); char * produce_topic_cnn = iniparser_getstring(ini, "kafka:produce_topic_cnn", "alarmed_queue_cnn"); produce_topic_cnn_str = produce_topic_cnn; char * produce_topic_gbm = iniparser_getstring(ini, "kafka:produce_topic_gbm", "alarmed_queue_gbm"); produce_topic_gbm_str = produce_topic_gbm; LOG_DEBUG("kafka brokers: ", brokersbytes); LOG_DEBUG("kafka topic_cnn: ", topic_cnn_bytes); LOG_DEBUG("kafka topic_gbm: ", topic_gbm_bytes); LOG_DEBUG("kafka group_cnn: ", group_cnn); LOG_DEBUG("kafka group_gbm: ", group_gbm); LOG_DEBUG("kafka produce_topic_cnn: ", produce_topic_cnn_str); LOG_DEBUG("kafka produce_topic_gbm: ", produce_topic_gbm_str); // 读取minio配置 endpoint = iniparser_getstring(ini, "minio:endpoint", "http://111.32.12.11:9000"); access_key = iniparser_getstring(ini, "minio:access_key", "UDM59PO2GGFR6AM8LSXP"); secret_key = iniparser_getstring(ini, "minio:secret_key", "YShs5u5aj5Znc7kZkfEkel1QagG9eKko+C0mRUHr"); LOG_DEBUG("minio endpoint: ", endpoint); LOG_DEBUG("minio access_key: ", access_key); LOG_DEBUG("minio secret_key: ", secret_key); // 读取服务配置 urlbytes = iniparser_getstring(ini, "server:urlcnn", "http://0.0.0.0:8087"); LOG_DEBUG("server url: ", urlbytes); mngurl = iniparser_getstring(ini, "server:mngurl", "http://127.0.0.1:8990/api/v1/model/getsyncModelData"); LOG_DEBUG("manage url: ", mngurl); // 读取cnn训练配置 model_key = iniparser_getstring(ini, "cnnupdate:model_key", "cnn_webshell"); //white_data = iniparser_getstring(ini, "cnnupdate:white_data", "white_samples.jsonl"); black_base_path = iniparser_getstring(ini, "cnnupdate:black_base_path", ""); max_samples = iniparser_getint(ini, "cnnupdate:max_samples", 1000); LOG_DEBUG("cnnupdate:model_key: ", model_key); LOG_DEBUG("cnnupdate:black_base_path: ", black_base_path); LOG_DEBUG("cnnupdate:max_samples: ", max_samples); // 读取gbm训练配置 model_name = iniparser_getstring(ini, "gbmupdate:model_name", "cnn_webshell"); //jsonl_file_path = iniparser_getstring(ini, "gbmupdate:jsonl_file_path", "white_features.jsonl"); jsonl_content_string = iniparser_getstring(ini, "gbmupdate:jsonl_content_string", ""); threshold = iniparser_getint(ini, "gbmupdate:threshold", 1000); LOG_DEBUG("gbmupdate:model_name: ", model_name); LOG_DEBUG("gbmupdate:jsonl_content_string: ", jsonl_content_string); LOG_DEBUG("gbmupdate:threshold: ", threshold); //cnn 参数 model_name_cnn = iniparser_getstring(ini, "cnn:model_name", "classification_cs_20250714.h5"); model_path_cnn = iniparser_getstring(ini, "cnn:model_path", "./"); //jsonl_file_path_cnn = iniparser_getstring(ini, "cnn:jsonl_file_path", "cnn_output.jsonl"); //gbm 参数 model_name_gbm = iniparser_getstring(ini, "gbm:model_name", "webshell_lgbm_classifier.lgb.20250902_171938.joblib"); model_path_gbm = iniparser_getstring(ini, "gbm:model_path", "./"); //jsonl_file_path_gbm = iniparser_getstring(ini, "gbm:jsonl_file_path", "gbm_output.jsonl"); //阀值 max_alert = iniparser_getint(ini, "alert:max_alert", 1000); //释放字典资源 //iniparser_freedict(ini); } void setGlobalCfg() { // 1. 加载或创建 INI 文件(若文件不存在,会创建空字典) dictionary *ini = iniparser_load("config_cnn.ini"); if (ini == NULL) { LOG_ERROR("无法加载或创建配置文件"); return; } // 2. 设置配置项(格式:"section:key",值需为字符串) // 2.1 写入 [cnn] 节 //cnn 参数 iniparser_set(ini, "cnn:model_name", model_name_cnn); iniparser_set(ini, "cnn:model_path", model_path_cnn); // 2.2 写入 [gbm] 节 //gbm 参数 iniparser_set(ini, "gbm:model_name", model_name_gbm); iniparser_set(ini, "gbm:model_path", model_path_gbm); // 2.3 写入 [cnnupdate] 节 // cnn训练配置 iniparser_set(ini, "cnnupdate:model_key", model_key); //iniparser_set(ini, "cnnupdate:white_data", white_data); iniparser_set(ini, "cnnupdate:black_base_path", black_base_path); char str_max_samples[20]; sprintf(str_max_samples, "%d", max_samples); iniparser_set(ini, "cnnupdate:max_samples", str_max_samples); // 2.4 写入 [gbmupdate] 节 // gbm训练配置 iniparser_set(ini, "gbmupdate:model_name", model_name); //iniparser_set(ini, "gbmupdate:jsonl_file_path", jsonl_file_path); iniparser_set(ini, "gbmupdate:jsonl_content_string", jsonl_content_string); char str_threshold[20]; sprintf(str_threshold, "%d", threshold); iniparser_set(ini, "gbmupdate:threshold", str_threshold); // 2.4 写入 [alert] 节 //阀值 char str_max_alert[20]; sprintf(str_max_alert, "%d", max_alert); iniparser_set(ini, "alert:max_alert", str_max_alert); // 3. 保存配置到文件 FILE *fIni = fopen("config_cnn.ini", "w"); if (fIni) { iniparser_dump_ini(ini, fIni); fclose(fIni); } else { LOG_ERROR("Failed to open file for writing"); iniparser_freedict(ini); // 释放资源 return; } LOG_INFO("配置文件写入成功!"); //4. 释放字典资源 iniparser_freedict(ini); } class ProducerDeliveryReportCb : public RdKafka::DeliveryReportCb { public: void dr_cb (RdKafka::Message &message) { LOG_INFO("Message delivery for (" , message.len() , " bytes): " , message.errstr() ); if (message.key()) LOG_INFO("Key: " , *(message.key()) , ";" ); } }; class KafkaEventCb : public RdKafka::EventCb { public: void event_cb (RdKafka::Event &event) { switch (event.type()) { case RdKafka::Event::EVENT_ERROR: LOG_ERROR("ERROR (" , RdKafka::err2str(event.err()) , "): " , event.str() ); if (event.err() == RdKafka::ERR__ALL_BROKERS_DOWN) // run = false; break; case RdKafka::Event::EVENT_STATS: LOG_ERROR("\"STATS\": " , event.str()); break; case RdKafka::Event::EVENT_LOG: fprintf(stderr, "LOG-%i-%s: %s\n", event.severity(), event.fac().c_str(), event.str().c_str()); break; default: LOG_ERROR("EVENT " , event.type() , " (" , RdKafka::err2str(event.err()) , "): " , event.str() ); break; } } }; //kafka处理 void* tfKafka(void *arg){ int model = *(int *)arg; std::string modStr; if(0 == model){ modStr = "cnn"; }else{ modStr = "gbm"; } ThreadLocalLogger::init(LogTarget::FILE,"./model_thread_logs", LogLevel::DEBUG,modStr,false); LOG_DEBUG("启动 ", modStr,"预测及训练!"); std::string brokers = brokersbytes; std::string errstr; //生产者 RdKafka::Conf *producer_conf = RdKafka::Conf::create(RdKafka::Conf::CONF_GLOBAL); ProducerDeliveryReportCb dr_cb; // 设置生产者配置 if (producer_conf->set("bootstrap.servers", brokers, errstr) != RdKafka::Conf::CONF_OK) { LOG_ERROR("生产者配置失败: ", errstr); //std::cerr << "生产者配置失败: " << errstr << std::endl; delete producer_conf; return NULL; } KafkaEventCb ex_event_cb; producer_conf->set("event_cb", &ex_event_cb, errstr); // 设置消息发送结果回调 if (producer_conf->set("dr_cb", &dr_cb, errstr) != RdKafka::Conf::CONF_OK) { LOG_ERROR("设置回调失败: ", errstr); //std::cerr << "设置回调失败: " << errstr << std::endl; delete producer_conf; return NULL; } // 创建生产者实例 RdKafka::Producer *producer = RdKafka::Producer::create(producer_conf, errstr); if (!producer) { LOG_ERROR("创建生产者失败: ", errstr); //std::cerr << "创建生产者失败: " << errstr << std::endl; delete producer_conf; return NULL; } delete producer_conf; // 配置对象已被生产者使用,可释放 // std::vector topics; char *groupbytes = NULL; if(model == 0){ topics.push_back(topic_cnn_bytes); groupbytes = group_cnn; }else{ topics.push_back(topic_gbm_bytes); groupbytes = group_gbm; } std::string group = groupbytes; PyGILState_STATE state = PyGILState_Ensure(); KafkaConsumer consumer(brokers, group, topics, RdKafka::Topic::OFFSET_BEGINNING); consumer.pullMessage(model,producer); RdKafka::wait_destroyed(5000); ThreadLocalLogger::destroy(); PyGILState_Release(state); return NULL; } void getParamFromMng(){ LOG_INFO("到管理端获取配置。"); ResponseData response; getParam(mngurl,&response); LOG_DEBUG("响应长度: ", response.size," 字节"); LOG_DEBUG("响应内容: ", response.buffer); //解析返回的参数。如果返回失败,则使用本地配置;成功,则用管理端获取的参数覆盖 if(response.size > 0 && response.buffer != NULL){ // 解析 JSON 字符串 cJSON *root = cJSON_Parse(response.buffer); if (root == NULL) { LOG_DEBUG("解析错误: ", cJSON_GetErrorPtr()); }else{ // 获取字段值 LOG_DEBUG("json解析成功"); //cJSON *j_white_data = cJSON_GetObjectItem(root, "white_data"); cJSON *j_model_key = cJSON_GetObjectItem(root, "model_key"); cJSON *j_max_alert = cJSON_GetObjectItem(root, "max_alert"); cJSON *j_model_path_cnn = cJSON_GetObjectItem(root, "model_path_cnn"); cJSON *j_jsonl_content_string = cJSON_GetObjectItem(root, "jsonl_content_string"); cJSON *j_threshold = cJSON_GetObjectItem(root, "threshold"); cJSON *j_black_base_path = cJSON_GetObjectItem(root, "black_base_path"); //cJSON *j_jsonl_file_path = cJSON_GetObjectItem(root, "jsonl_file_path"); //cJSON *j_model_name_gbm = cJSON_GetObjectItem(root, "model_name_gbm"); //cJSON *j_jsonl_file_path_gbm = cJSON_GetObjectItem(root, "jsonl_file_path_gbm"); cJSON *j_max_samples = cJSON_GetObjectItem(root, "max_samples"); cJSON *j_model_name = cJSON_GetObjectItem(root, "model_name"); //cJSON *j_model_path_gbm = cJSON_GetObjectItem(root, "model_path_gbm"); cJSON *j_model_name_cnn = cJSON_GetObjectItem(root, "model_name_cnn"); //cJSON *j_jsonl_file_path_cnn = cJSON_GetObjectItem(root, "jsonl_file_path_cnn"); //告警数阀值 if(j_max_alert != NULL && j_max_alert->type == cJSON_Number){ max_alert = j_max_alert->valueint; } // 读取cnn训练配置 if(j_model_key != NULL){ model_key = j_model_key->valuestring; } // if(j_white_data != NULL){ // white_data = j_white_data->valuestring; // } if(j_black_base_path != NULL){ black_base_path = j_black_base_path->valuestring; } if(j_max_samples != NULL && j_max_alert->type == cJSON_Number){ max_samples = j_max_samples->valueint; } // 读取gbm训练配置 if(j_model_name != NULL){ model_name = j_model_name->valuestring; } // if(j_jsonl_file_path != NULL){ // jsonl_file_path = j_jsonl_file_path->valuestring; // } if(j_jsonl_content_string != NULL){ jsonl_content_string = j_jsonl_content_string->valuestring; } if(j_threshold != NULL && j_max_alert->type == cJSON_Number){ threshold = j_threshold->valueint; } //cnn 参数 if(j_model_name_cnn != NULL){ model_name_cnn = j_model_name_cnn->valuestring; } if(j_model_path_cnn != NULL){ model_path_cnn = j_model_path_cnn->valuestring; } // if(j_jsonl_file_path_cnn != NULL){ // jsonl_file_path_cnn = j_jsonl_file_path_cnn->valuestring; // } //gbm 参数 // if(j_model_path_gbm != NULL){ // model_path_gbm = j_model_path_gbm->valuestring; // } // if(j_model_name_gbm != NULL){ // model_name_gbm = j_model_name_gbm->valuestring; // } // if(j_jsonl_file_path_gbm != NULL){ // jsonl_file_path_gbm = j_jsonl_file_path_gbm->valuestring; // } LOG_DEBUG("json解析结束"); } } } int main(int argc, char* argv[]) { ThreadLocalLogger::init(LogTarget::CONSOLE,"", LogLevel::DEBUG,"",false); //加载本地配置 loadGlobalCfg(); //到管理端获取配置 getParamFromMng(); //初始化python解析器 Py_InitializeEx(1); Py_Initialize(); PyThreadState* main_thread_state = PyEval_SaveThread(); //单独线程,用于接收是否处于模型训练状态的请求 LOG_INFO("启动http服务线程..."); pthread_t threadHttp; pthread_create(&threadHttp, NULL, tfSetParam, urlbytes); LOG_DEBUG("触发训练告警阀值[max_alert]:" , max_alert); //单独线程,用于两种模型的处理 LOG_INFO("启动模型线程..."); pthread_t threadKafkaCnn,threadKafkaGbm; int modelCnn = 0; //0:cnn;1:gbm pthread_create(&threadKafkaCnn, NULL, tfKafka, &modelCnn); int modelGbm = 1; //pthread_create(&threadKafkaGbm, NULL, tfKafka, &modelGbm); pthread_join(threadKafkaCnn, NULL); //pthread_join(threadKafkaGbm, NULL); pthread_join(threadHttp, NULL); PyEval_RestoreThread(main_thread_state); Py_Finalize(); return 0; }