#include #include #include #include #include #include #include #include #include #include #include #include #include #include "my_utils.h" #include "vehicle_event_service.h" #include #include #include #include #include #include #include #include #include #include #include #include #include "ds_yml_parse.h" #include // ======================== 配置区 ======================== // 全局记录存储 RecordStore g_store; // 全局请求队列 RecordQueue g_queue; // 正面摄像头 RTSP 地址映射表 std::unordered_map g_source_front_map; // 侧面摄像头 RTSP 地址映射表 std::unordered_map g_source_side_map; RtspManager rtsp_mgr_side; static std::mutex g_vehicle_mutex; std::atomic stop_flag(false); #include #include bool delete_file(const std::string &file_path) { try { if (std::filesystem::exists(file_path)) { return std::filesystem::remove(file_path); // 成功返回 true } else { std::cout << "file does not exist: " << file_path << std::endl; return false; } } catch (const std::filesystem::filesystem_error &e) { std::cout << "failed to delete file: " << e.what() << std::endl; return false; } } /** * 更新车辆基础信息(跨线前,可多次更新) */ void update_object_base_info(guint source_id, guint64 object_id, const std::string &vehicle_type) { std::lock_guard lk(g_vehicle_mutex); TrackKey key{source_id, object_id}; Record record; if (g_store.get(key, record)) { // 已经过 Line B 上传的不再更新车牌/车型 if (record.crossed_line_b) { return; } // vehicle_type 一般比车牌稳定,可直接覆盖或也做类似策略 if (!vehicle_type.empty()) { record.vehicle_type = vehicle_type; } } else { // 新对象 record.source_id = source_id; record.object_id = object_id; record.vehicle_type = vehicle_type; record.insert_time = std::chrono::steady_clock::now(); } g_store.upsert(key, record); } /** * 触发车辆跨线事件(拍照线 Line A) */ void trigger_line_a_capture(guint source_id, guint64 object_id) { std::lock_guard lk(g_vehicle_mutex); TrackKey key{source_id, object_id}; Record record; if (!g_store.get(key, record)) { // 没有基础信息,不允许触发 return; } if (record.crossed_line_a) { // 已经在 Line A 拍照过,不再重复 return; } // 状态迁移:未拍照 -> 已拍照 record.crossed_line_a = true; // 拍照,但不上传 record.vehicle_image_side = rtsp_mgr_side.capture_frame_by_source(source_id); g_store.upsert(key, record); } /** * 触发车辆跨线事件(上传线 Line B) */ void trigger_line_b_upload(guint source_id, guint64 object_id, const std::string &file_name) { std::lock_guard lk(g_vehicle_mutex); TrackKey key{source_id, object_id}; Record record; if (!g_store.get(key, record)) { // 没有基础信息,不允许触发 return; } if (!record.crossed_line_a) { // 没有经过 Line A,不允许上传 g_store.remove(key); return; } if (record.crossed_line_b) { // 已经在 Line B 上传过,保证只触发一次 g_store.remove(key); return; } record.vehicle_image_front = file_name; // 状态迁移:未上传 -> 已上传 record.crossed_line_b = true; g_store.upsert(key, record); int vehicle_type = ds_parse_group_type(yaml_file_path, "vehicle_type"); bool match = false; switch (vehicle_type) { case 0: match = true; break; case 1: match = (record.vehicle_type == "coupe"); break; case 2: match = (record.vehicle_type == "largevehicle"); break; case 3: match = (record.vehicle_type == "sedan"); break; case 4: match = (record.vehicle_type == "suv"); break; case 5: match = (record.vehicle_type == "truck"); break; case 6: match = (record.vehicle_type == "van"); break; case 7: match = (record.vehicle_type == "largevehicle" || record.vehicle_type == "truck"); break; default: match = false; break; } if (match) { g_queue.push(record); } else { // 删除不符合条件的记录与图片,避免占用空间 g_store.remove(key); } } // ======================================================= /** * MinIO 上传 */ bool upload_minio(const std::string &object_name, const std::string &file_path) { // Create S3 base URL. minio::s3::BaseUrl base_url; base_url.host = "ai.ronsunny.cn"; base_url.port = 9000; base_url.https = true; base_url.region = "China.Chengdu"; // Create credential provider. minio::creds::StaticProvider provider("minioadmin", "minioadmin"); // Create S3 client. minio::s3::Client client(base_url, &provider); std::string bucket_name = "sentinel"; // Check 'asiatrip' bucket exist or not. bool exist; { minio::s3::BucketExistsArgs args; args.bucket = bucket_name; minio::s3::BucketExistsResponse resp = client.BucketExists(args); if (!resp) { std::cout << "unable to do bucket existence check:" << resp.Error() << std::endl; return EXIT_FAILURE; } exist = resp.exist; } if (!exist) { minio::s3::MakeBucketArgs args; args.bucket = bucket_name; minio::s3::MakeBucketResponse resp = client.MakeBucket(args); if (!resp) { std::cout << "unable to create bucket; " << resp.Error() << std::endl; return EXIT_FAILURE; } } minio::s3::UploadObjectArgs args; args.bucket = bucket_name; args.object = object_name; args.filename = file_path; minio::s3::UploadObjectResponse resp = client.UploadObject(args); if (!resp) { std::cout << "unable to upload object; " << resp.Error() << std::endl; return EXIT_FAILURE; } return EXIT_SUCCESS; } // void front_worker_thread() { // std::string device_id = GetDeviceID(); // AmqpClient::Channel::OpenOpts opts; // opts.host = "ai.ronsunny.cn"; // opts.port = 5672; // opts.auth = // AmqpClient::Channel::OpenOpts::BasicAuth("jetson_sentinel", "123456"); // opts.vhost = "sentinel"; // auto channel = AmqpClient::Channel::Open(opts); // std::string queue = "sentinel.front_pic." + device_id; // channel->DeclareQueue(queue, false, true, false, false); // std::string consumer_tag = // channel->BasicConsume(queue, "", true, true, false); // while (!stop_flag) { // AmqpClient::Envelope::ptr_t envelope; // bool received = channel->BasicConsumeMessage(consumer_tag, envelope); // if (received) { // printf("Received message: %s\n", envelope->Message()->Body().c_str()); // std::string recordId = envelope->Message()->Body(); // // 1. 上传正面照到OSS // Record record; // if (!g_store.get_by_id(recordId, record)) { // std::cerr << "No record found for id=" << recordId << std::endl; // continue; // } // if (record.vehicle_image_front.empty()) { // std::cerr << "No front image for record id=" << recordId << // std::endl; continue; // } // auto file_path = // "./temp/" + // std::filesystem::path(record.vehicle_image_front).filename().string(); // auto upload_result = upload_minio( // "vehicle_image_front/" + record.vehicle_image_side, file_path); // if (EXIT_FAILURE == upload_result) { // std::cerr << "Failed to upload image for record id=" << // record.object_id // << std::endl; // continue; // } // // 2. HTTP 请求服务器 // AnalyticsClient client("https://ai.ronsunny.cn:8090/api/public/" // "sentinel-record-analytics"); // std::string err; // nlohmann::json j; // j["Id"] = recordId; // j["DeviceId"] = device_id; // j["vehicleImageFront"] = record.vehicle_image_front; // j["vehicleImageSide"] = record.vehicle_image_side; // if (!client.call(j, err)) { // std::cerr << "Analytics request failed: " << err << "\n"; // } // TrackKey key{record.source_id, record.object_id}; // g_store.remove(key); // } // } // } void side_worker_thread() { while (!stop_flag) { Record record = g_queue.pop(); if (record.vehicle_image_side.empty() || record.vehicle_image_front.empty()) { std::cerr << "No side image for record id=" << record.object_id << std::endl; continue; } auto file_path = "./temp/" + std::filesystem::path(record.vehicle_image_side).filename().string(); if (!std::filesystem::exists(file_path) || std::filesystem::is_directory(file_path)) { std::cerr << "invalid file: " << file_path << std::endl; continue; } auto upload_result = upload_minio( "vehicle_image_side/" + record.vehicle_image_side, file_path); if (EXIT_FAILURE == upload_result) { std::cerr << "Failed to upload image for record id=" << record.object_id << std::endl; continue; } file_path = "./temp/" + std::filesystem::path(record.vehicle_image_front).filename().string(); if (!std::filesystem::exists(file_path) || std::filesystem::is_directory(file_path)) { std::cerr << "invalid file: " << file_path << std::endl; continue; } upload_result = upload_minio( "vehicle_image_front/" + record.vehicle_image_front, file_path); if (EXIT_FAILURE == upload_result) { std::cerr << "Failed to upload image for record id=" << record.object_id << std::endl; continue; } // 2. HTTP 请求服务器 AnalyticsClient client("https://ai.ronsunny.cn:8090/api/public/" "sentinel-record-analytics"); std::string err; std::string id = generateUUID(); record.id = id; nlohmann::json j; j["Id"] = id; j["DeviceId"] = GetDeviceID(); j["VehicleType"] = record.vehicle_type; j["vehicleImageFront"] = record.vehicle_image_front; j["vehicleImageSide"] = record.vehicle_image_side; if (!client.call(j, err)) { std::cerr << "Analytics request failed: " << err << "\n"; } // 删除本地文件和记录 TrackKey key{record.source_id, record.object_id}; g_store.remove(key); } }