#include #include #include "KafkaConsumer.h" #include "cJSON.h" #include "AiModule.h" #include "minio_download.h" #include "thread_local_logger.h" extern int max_alert; //minio 参数 extern char *endpoint; extern char *access_key; extern char *secret_key; //cnn 参数 extern char *model_name_cnn; extern char *model_path_cnn; extern char *jsonl_file_path_cnn; //gbm 参数 extern char *model_name_gbm; extern char *model_path_gbm; extern char *jsonl_file_path_gbm; //cnn update 参数 extern char *model_key; extern char *white_data; extern char *black_base_path; extern int max_samples; //gbm update 参数 extern char *model_name; extern char *jsonl_file_path; extern char *jsonl_content_string; extern int threshold; //kafka extern std::string produce_topic_cnn_str; extern std::string produce_topic_gbm_str; class ConsumerEventCb : public RdKafka::EventCb { public: void event_cb(RdKafka::Event &event) { switch (event.type()) { case RdKafka::Event::EVENT_ERROR: if (event.fatal()) { LOG_ERROR("FATAL "); } LOG_ERROR("ERROR (" ,RdKafka::err2str(event.err()) , "): " ,event.str() ); break; case RdKafka::Event::EVENT_STATS: LOG_ERROR("\"STATS\": " ,event.str() ); break; case RdKafka::Event::EVENT_LOG: fprintf(stderr, "LOG-%i-%s: %s\n", event.severity(), event.fac().c_str(), event.str().c_str()); break; case RdKafka::Event::EVENT_THROTTLE: LOG_ERROR("THROTTLED: " ,event.throttle_time() , "ms by " , event.broker_name() , " id " , (int)event.broker_id()); break; default: LOG_ERROR("EVENT " , event.type() , " (" , RdKafka::err2str(event.err()) , "): " , event.str() ); break; } } }; class ConsumerRebalanceCb : public RdKafka::RebalanceCb { private: static void printTopicPartition(const std::vector& partitions) // 打印当前获取的分区 { for (unsigned int i = 0; i < partitions.size(); i++) LOG_ERROR(partitions[i]->topic() , "[" , partitions[i]->partition() , "], "); //std::cerr << "\n"; } public: void rebalance_cb(RdKafka::KafkaConsumer *consumer, RdKafka::ErrorCode err, std::vector &partitions) { LOG_ERROR( "RebalanceCb: " , RdKafka::err2str(err) , ": "); printTopicPartition(partitions); if (err == RdKafka::ERR__ASSIGN_PARTITIONS) { consumer->assign(partitions); partition_count = (int)partitions.size(); } else { consumer->unassign(); partition_count = 0; } } private: int partition_count; }; KafkaConsumer::KafkaConsumer(const std::string& brokers, const std::string& groupID, const std::vector& topics, int partition) { m_brokers = brokers; m_groupID = groupID; m_topicVector = topics; m_partition = partition; std::string errorStr; RdKafka::Conf::ConfResult errorCode; m_config = RdKafka::Conf::create(RdKafka::Conf::CONF_GLOBAL); m_event_cb = new ConsumerEventCb; errorCode = m_config->set("event_cb", m_event_cb, errorStr); if(errorCode != RdKafka::Conf::CONF_OK) { LOG_ERROR("Conf set failed: " , errorStr ); } m_rebalance_cb = new ConsumerRebalanceCb; errorCode = m_config->set("rebalance_cb", m_rebalance_cb, errorStr); if(errorCode != RdKafka::Conf::CONF_OK) { LOG_ERROR("Conf set failed: " , errorStr); } errorCode = m_config->set("enable.partition.eof", "false", errorStr); if(errorCode != RdKafka::Conf::CONF_OK) { LOG_ERROR("Conf set failed: " , errorStr ); } errorCode = m_config->set("group.id", m_groupID, errorStr); if(errorCode != RdKafka::Conf::CONF_OK) { LOG_ERROR("Conf set failed: " , errorStr); } errorCode = m_config->set("bootstrap.servers", m_brokers, errorStr); if(errorCode != RdKafka::Conf::CONF_OK) { LOG_ERROR("Conf set failed: " ,errorStr); } errorCode = m_config->set("max.partition.fetch.bytes", "1024000", errorStr); if(errorCode != RdKafka::Conf::CONF_OK) { LOG_ERROR("Conf set failed: " , errorStr); } // partition.assignment.strategy range,roundrobin m_topicConfig = RdKafka::Conf::create(RdKafka::Conf::CONF_TOPIC); // 获取最新的消息数据 errorCode = m_topicConfig->set("auto.offset.reset", "latest", errorStr); if(errorCode != RdKafka::Conf::CONF_OK) { LOG_ERROR("Topic Conf set failed: " ,errorStr); } errorCode = m_config->set("default_topic_conf", m_topicConfig, errorStr); if(errorCode != RdKafka::Conf::CONF_OK) { LOG_ERROR("Conf set failed: " ,errorStr); } m_consumer = RdKafka::KafkaConsumer::create(m_config, errorStr); if(m_consumer == NULL) { LOG_ERROR("Create KafkaConsumer failed: " , errorStr); } LOG_ERROR("Created consumer " , m_consumer->name()); } // 从路径字符串中截取文件名(最后一个 '/' 后的部分) const char* get_filename(const char* path) { if (path == NULL) { return NULL; // 处理空指针 } // 查找最后一个 '/' 的位置 const char* last_slash = strrchr(path, '/'); if (last_slash == NULL) { // 没有找到 '/',整个字符串就是文件名 return path; } else { // 返回 '/' 后面的部分(跳过 '/' 本身) return last_slash + 1; } } void updateGlobalCfg(int model,const char* config_file) { // 1. 加载或创建 INI 文件(若文件不存在,会创建空字典) dictionary *ini = iniparser_load(config_file); if (ini == NULL) { LOG_ERROR("无法加载或创建配置文件"); return; } // 2.2 写入 [model_name] 节 if(0 == model){ iniparser_set(ini, "cnn:model_name", model_name_cnn); }else{ iniparser_set(ini, "gbm:model_name", model_name_gbm); } // 3. 保存配置到文件 FILE *fIni = fopen(config_file, "w"); if (fIni) { iniparser_dump_ini(ini, fIni); fclose(fIni); } else { LOG_ERROR("Failed to open file for writing"); iniparser_freedict(ini); // 释放资源 return; } LOG_INFO("配置文件写入成功!"); //4. 释放字典资源 iniparser_freedict(ini); } //kafka消息处理 int kafkaProcess(std::string bucket,std::string object_name,std::string url,int model,RdKafka::Producer *producer) { //改为将url地址传给so库函数 // // 初始化libcurl // curl_global_init(CURL_GLOBAL_DEFAULT); // // MinIO配置 - 根据实际情况修改 // std::string endpoint_loc = endpoint; // 替换为你的MinIO地址 // std::string access_key_loc = access_key; // 替换为你的Access Key // std::string secret_key_loc = secret_key; // 替换为你的Secret Key // //std::string bucket = "2025-09-16"; // 替换为你的存储桶名称 //std::string object_name = "19/output/321.pcap_fj.jsonl"; // 替换为要下载的对象名称 // const char *bucket_bytes = bucket.c_str(); // const char *object_name_bytes = object_name.c_str(); const char *model_name_cnn_loc = model_name_cnn; const char *model_path_cnn_loc = model_path_cnn; const char *jsonl_file_url_loc = url.c_str(); const char *model_name_gbm_loc = model_name_gbm; const char *model_path_gbm_loc = model_path_gbm; //const char *jsonl_file_path_gbm_loc = get_filename(object_name.c_str()); // // 下载文件 // bool success = download_from_minio(endpoint_loc, access_key_loc, secret_key_loc, bucket, object_name, jsonl_file_path_cnn_loc); // // 清理libcurl // curl_global_cleanup(); bool success = true; if(success){ LOG_INFO("process model: ", model); //std::cout << "process model:"<= max_alert){ LOG_INFO("根据预测结果alert_count 判断需要cnn训练."); const char *module_name_cnn_loc = model_key; const char *black_base_path_cnnloc = black_base_path; char result_json[2048]; result = updateCnnProcess(module_name_cnn_loc,jsonl_file_url_loc,black_base_path_cnnloc,result_json,max_samples); // 解析 JSON 字符串 cJSON *root = cJSON_Parse(result_json); if (root == NULL) { LOG_ERROR("解析错误: ", cJSON_GetErrorPtr()); }else{ // 获取字段值 cJSON *return_code = cJSON_GetObjectItem(root, "return_code"); if(return_code != NULL && return_code->type == cJSON_Number){ int code = return_code->valueint; //训练成功 if(1 == code){ cJSON *new_model = cJSON_GetObjectItem(root, "message"); strcpy(model_name_cnn,new_model->valuestring); LOG_DEBUG("新模型:",model_name_cnn); updateGlobalCfg(0,"config_cnn.ini"); } } cJSON_Delete(root); } }else if(alert_count >0){ //上报kafka LOG_INFO("根据预测结果alert_count 判断上报文件到kafka."); cJSON *root = cJSON_CreateObject(); if (root != NULL) { cJSON_AddNumberToObject(root, "alert_count", alert_count); cJSON_AddStringToObject(root, "bucket", bucket.c_str()); cJSON_AddStringToObject(root, "object_name", object_name.c_str()); char *json_str = cJSON_Print(root); if (json_str == NULL) { LOG_ERROR("JSON 转换为字符串失败"); cJSON_Delete(root); } //LOG_INFO("上报kafka:",json_str); LOG_INFO("上报kafka."); std::string produce_msg = json_str; RdKafka::ErrorCode err = producer->produce( produce_topic_cnn_str, // 目标主题 RdKafka::Topic::PARTITION_UA, // 自动选择分区 RdKafka::Producer::RK_MSG_COPY, // 复制消息内容(避免内存问题) const_cast(produce_msg.data()), // 消息数据 produce_msg.size(), // 消息长度 nullptr, // 键(无键) 0, // 键长度(无键时为 0) 0, // 时间戳 nullptr // 私有数据 ); if (err != RdKafka::ERR_NO_ERROR) { LOG_ERROR("生产消息失败:",RdKafka::err2str(err)); //std::cerr << "生产消息失败: " << RdKafka::err2str(err) << std::endl; } }else{ LOG_ERROR("创建根对象失败"); } } }else{ alert_count = gbmProcess(bucket.c_str(),object_name.c_str(),model_name_gbm_loc,model_path_gbm_loc,jsonl_file_url_loc); LOG_INFO("gbm alert_count: ",alert_count ,"|max_alert: ",max_alert); LOG_DEBUG("gbm检测模型:",model_name_gbm_loc); //std::cout << "gbm alert_count:"<= max_alert){ LOG_INFO("根据预测结果alert_count 判断需要gbm训练."); const char *module_name_gbm_loc = model_name; const char *black_base_path_gbmloc = jsonl_content_string; char result_json[2048]; result = updateGbmProcess(module_name_gbm_loc,jsonl_file_url_loc,black_base_path_gbmloc,result_json,threshold); // 解析 JSON 字符串 cJSON *root = cJSON_Parse(result_json); if (root == NULL) { LOG_ERROR("解析错误: ", cJSON_GetErrorPtr()); }else{ // 获取字段值 cJSON *return_code = cJSON_GetObjectItem(root, "return_code"); if(return_code != NULL && return_code->type == cJSON_Number){ int code = return_code->valueint; //训练成功 if(1 == code){ cJSON *new_model = cJSON_GetObjectItem(root, "message"); strcpy(model_name_gbm,new_model->valuestring); LOG_DEBUG("新模型:",model_name_gbm); updateGlobalCfg(1,"config_gbm.ini"); } } cJSON_Delete(root); } }else if(alert_count >0){ //上报kafka LOG_INFO("根据预测结果alert_count 判断上报文件到kafka."); cJSON *root = cJSON_CreateObject(); if (root != NULL) { cJSON_AddNumberToObject(root, "alert_count", alert_count); cJSON_AddStringToObject(root, "bucket", bucket.c_str()); cJSON_AddStringToObject(root, "object_name", object_name.c_str()); char *json_str = cJSON_Print(root); if (json_str == NULL) { LOG_ERROR("JSON 转换为字符串失败"); cJSON_Delete(root); } //LOG_INFO("上报kafka:",json_str); LOG_INFO("上报kafka."); std::string produce_msg = json_str; RdKafka::ErrorCode err = producer->produce( produce_topic_gbm_str, // 目标主题 RdKafka::Topic::PARTITION_UA, // 自动选择分区 RdKafka::Producer::RK_MSG_COPY, // 复制消息内容(避免内存问题) const_cast(produce_msg.data()), // 消息数据 produce_msg.size(), // 消息长度 nullptr, // 键(无键) 0, // 键长度(无键时为 0) 0, // 时间戳 nullptr // 私有数据 ); if (err != RdKafka::ERR_NO_ERROR) { LOG_INFO("生产消息失败: " ,RdKafka::err2str(err)); } }else{ LOG_ERROR("创建根对象失败"); } } } //PyGILState_Release(state); } return success ? 0 : 1; } void msg_consume(RdKafka::Message* msg, void* opaque,int model,RdKafka::Producer *producer) { switch (msg->err()) { case RdKafka::ERR__TIMED_OUT:{ //std::cerr << "Consumer error: " << msg->errstr() << std::endl; // 超时 break; } case RdKafka::ERR_NO_ERROR:{ // 有消息进来 LOG_DEBUG(" Message in-> topic:" , msg->topic_name()," partition:[" ,msg->partition(),"] at offset ",msg->offset(), " key: ",(msg->key() ? *msg->key() : "NULL")," payload: ", (char*)msg->payload()); // std::cout << " Message in-> topic:" << msg->topic_name() << " partition:[" // << msg->partition() << "] at offset " << msg->offset() // << " key: " << (msg->key() ? *msg->key() : "NULL") << " payload: " // << (char*)msg->payload() << std::endl; // 解析 JSON 字符串 cJSON *root = cJSON_Parse((char*)msg->payload()); if (root == NULL) { LOG_ERROR("解析错误: ", cJSON_GetErrorPtr()); }else{ // 获取字段值 cJSON *bucket = cJSON_GetObjectItem(root, "bucket"); cJSON *object = cJSON_GetObjectItem(root, "object"); cJSON *url = cJSON_GetObjectItem(root, "url"); std::string strBucket = bucket->valuestring; std::string strObject = object->valuestring; std::string strUrl = url->valuestring; kafkaProcess(strBucket,strObject,strUrl,model,producer); cJSON_Delete(root); } break; } default:{ LOG_ERROR("Consumer default error: " ,msg->errstr() ); break; } } } void KafkaConsumer::pullMessage(int model,RdKafka::Producer *producer) { // 订阅Topic RdKafka::ErrorCode errorCode = m_consumer->subscribe(m_topicVector); if (errorCode != RdKafka::ERR_NO_ERROR) { LOG_ERROR("subscribe failed: " ,RdKafka::err2str(errorCode)); } LOG_INFO("subscribe successed: " , RdKafka::err2str(errorCode) ); // 消费消息 while(true) { RdKafka::Message *msg = m_consumer->consume(1000); msg_consume(msg, NULL,model,producer); delete msg; } } KafkaConsumer::~KafkaConsumer() { m_consumer->close(); delete m_config; delete m_topicConfig; delete m_consumer; delete m_event_cb; delete m_rebalance_cb; }