Files
Sentinel/src/service/vehicle_event_service.cpp
T
2026-05-26 11:46:24 +08:00

363 lines
11 KiB
C++

#include <curl/curl.h>
#include <gst/app/gstappsink.h>
#include <gst/gst.h>
#include <fstream>
#include <iostream>
#include <mutex>
#include <string>
#include <condition_variable>
#include <list>
#include <mutex>
#include <queue>
#include <thread>
#include <unordered_map>
#include "my_utils.h"
#include "vehicle_event_service.h"
#include <filesystem>
#include <optional>
#include <atomic>
#include <chrono>
#include <filesystem>
#include <iostream>
#include <memory>
#include <mutex>
#include <string>
#include <thread>
#include <unordered_map>
#include <miniocpp/client.h>
#include "ds_yml_parse.h"
#include <SimpleAmqpClient/SimpleAmqpClient.h>
// ======================== 配置区 ========================
// 全局记录存储
RecordStore g_store;
// 全局请求队列
RecordQueue g_queue;
// 正面摄像头 RTSP 地址映射表
std::unordered_map<int, std::string> g_source_front_map;
// 侧面摄像头 RTSP 地址映射表
std::unordered_map<int, std::string> g_source_side_map;
RtspManager rtsp_mgr_side;
static std::mutex g_vehicle_mutex;
std::atomic<bool> stop_flag(false);
#include <algorithm>
#include <string>
bool delete_file(const std::string &file_path) {
try {
if (std::filesystem::exists(file_path)) {
return std::filesystem::remove(file_path); // 成功返回 true
} else {
std::cout << "file does not exist: " << file_path << std::endl;
return false;
}
} catch (const std::filesystem::filesystem_error &e) {
std::cout << "failed to delete file: " << e.what() << std::endl;
return false;
}
}
/**
* 更新车辆基础信息(跨线前,可多次更新)
*/
void update_object_base_info(guint source_id, guint64 object_id,
const std::string &vehicle_type) {
std::lock_guard<std::mutex> lk(g_vehicle_mutex);
TrackKey key{source_id, object_id};
Record record;
if (g_store.get(key, record)) {
// 已经过 Line B 上传的不再更新车牌/车型
if (record.crossed_line_b) {
return;
}
// vehicle_type 一般比车牌稳定,可直接覆盖或也做类似策略
if (!vehicle_type.empty()) {
record.vehicle_type = vehicle_type;
}
} else {
// 新对象
record.source_id = source_id;
record.object_id = object_id;
record.vehicle_type = vehicle_type;
record.insert_time = std::chrono::steady_clock::now();
}
g_store.upsert(key, record);
}
/**
* 触发车辆跨线事件(拍照线 Line A)
*/
void trigger_line_a_capture(guint source_id, guint64 object_id) {
std::lock_guard<std::mutex> lk(g_vehicle_mutex);
TrackKey key{source_id, object_id};
Record record;
if (!g_store.get(key, record)) {
// 没有基础信息,不允许触发
return;
}
if (record.crossed_line_a) {
// 已经在 Line A 拍照过,不再重复
return;
}
// 状态迁移:未拍照 -> 已拍照
record.crossed_line_a = true;
// 拍照,但不上传
record.vehicle_image_side = rtsp_mgr_side.capture_frame_by_source(source_id);
g_store.upsert(key, record);
}
/**
* 触发车辆跨线事件(上传线 Line B)
*/
void trigger_line_b_upload(guint source_id, guint64 object_id,
const std::string &file_name) {
std::lock_guard<std::mutex> lk(g_vehicle_mutex);
TrackKey key{source_id, object_id};
Record record;
if (!g_store.get(key, record)) {
// 没有基础信息,不允许触发
return;
}
if (!record.crossed_line_a) {
// 没有经过 Line A,不允许上传
g_store.remove(key);
return;
}
if (record.crossed_line_b) {
// 已经在 Line B 上传过,保证只触发一次
g_store.remove(key);
return;
}
record.vehicle_image_front = file_name;
// 状态迁移:未上传 -> 已上传
record.crossed_line_b = true;
g_store.upsert(key, record);
int vehicle_type = ds_parse_group_type(yaml_file_path, "vehicle_type");
bool match = false;
switch (vehicle_type) {
case 0:
match = true;
break;
case 1:
match = (record.vehicle_type == "coupe");
break;
case 2:
match = (record.vehicle_type == "largevehicle");
break;
case 3:
match = (record.vehicle_type == "sedan");
break;
case 4:
match = (record.vehicle_type == "suv");
break;
case 5:
match = (record.vehicle_type == "truck");
break;
case 6:
match = (record.vehicle_type == "van");
break;
case 7:
match = (record.vehicle_type == "largevehicle" ||
record.vehicle_type == "truck");
break;
default:
match = false;
break;
}
if (match) {
g_queue.push(record);
} else {
// 删除不符合条件的记录与图片,避免占用空间
g_store.remove(key);
}
}
// =======================================================
/**
* MinIO 上传
*/
bool upload_minio(const std::string &object_name,
const std::string &file_path) {
// Create S3 base URL.
minio::s3::BaseUrl base_url;
base_url.host = "ai.ronsunny.cn";
base_url.port = 9000;
base_url.https = true;
base_url.region = "China.Chengdu";
// Create credential provider.
minio::creds::StaticProvider provider("minioadmin", "minioadmin");
// Create S3 client.
minio::s3::Client client(base_url, &provider);
std::string bucket_name = "sentinel";
// Check 'asiatrip' bucket exist or not.
bool exist;
{
minio::s3::BucketExistsArgs args;
args.bucket = bucket_name;
minio::s3::BucketExistsResponse resp = client.BucketExists(args);
if (!resp) {
std::cout << "unable to do bucket existence check:" << resp.Error()
<< std::endl;
return EXIT_FAILURE;
}
exist = resp.exist;
}
if (!exist) {
minio::s3::MakeBucketArgs args;
args.bucket = bucket_name;
minio::s3::MakeBucketResponse resp = client.MakeBucket(args);
if (!resp) {
std::cout << "unable to create bucket; " << resp.Error() << std::endl;
return EXIT_FAILURE;
}
}
minio::s3::UploadObjectArgs args;
args.bucket = bucket_name;
args.object = object_name;
args.filename = file_path;
minio::s3::UploadObjectResponse resp = client.UploadObject(args);
if (!resp) {
std::cout << "unable to upload object; " << resp.Error() << std::endl;
return EXIT_FAILURE;
}
return EXIT_SUCCESS;
}
// void front_worker_thread() {
// std::string device_id = GetDeviceID();
// AmqpClient::Channel::OpenOpts opts;
// opts.host = "ai.ronsunny.cn";
// opts.port = 5672;
// opts.auth =
// AmqpClient::Channel::OpenOpts::BasicAuth("jetson_sentinel", "123456");
// opts.vhost = "sentinel";
// auto channel = AmqpClient::Channel::Open(opts);
// std::string queue = "sentinel.front_pic." + device_id;
// channel->DeclareQueue(queue, false, true, false, false);
// std::string consumer_tag =
// channel->BasicConsume(queue, "", true, true, false);
// while (!stop_flag) {
// AmqpClient::Envelope::ptr_t envelope;
// bool received = channel->BasicConsumeMessage(consumer_tag, envelope);
// if (received) {
// printf("Received message: %s\n", envelope->Message()->Body().c_str());
// std::string recordId = envelope->Message()->Body();
// // 1. 上传正面照到OSS
// Record record;
// if (!g_store.get_by_id(recordId, record)) {
// std::cerr << "No record found for id=" << recordId << std::endl;
// continue;
// }
// if (record.vehicle_image_front.empty()) {
// std::cerr << "No front image for record id=" << recordId <<
// std::endl; continue;
// }
// auto file_path =
// "./temp/" +
// std::filesystem::path(record.vehicle_image_front).filename().string();
// auto upload_result = upload_minio(
// "vehicle_image_front/" + record.vehicle_image_side, file_path);
// if (EXIT_FAILURE == upload_result) {
// std::cerr << "Failed to upload image for record id=" <<
// record.object_id
// << std::endl;
// continue;
// }
// // 2. HTTP 请求服务器
// AnalyticsClient client("https://ai.ronsunny.cn:8090/api/public/"
// "sentinel-record-analytics");
// std::string err;
// nlohmann::json j;
// j["Id"] = recordId;
// j["DeviceId"] = device_id;
// j["vehicleImageFront"] = record.vehicle_image_front;
// j["vehicleImageSide"] = record.vehicle_image_side;
// if (!client.call(j, err)) {
// std::cerr << "Analytics request failed: " << err << "\n";
// }
// TrackKey key{record.source_id, record.object_id};
// g_store.remove(key);
// }
// }
// }
void side_worker_thread() {
while (!stop_flag) {
Record record = g_queue.pop();
if (record.vehicle_image_side.empty() ||
record.vehicle_image_front.empty()) {
std::cerr << "No side image for record id=" << record.object_id
<< std::endl;
continue;
}
auto file_path =
"./temp/" +
std::filesystem::path(record.vehicle_image_side).filename().string();
if (!std::filesystem::exists(file_path) ||
std::filesystem::is_directory(file_path)) {
std::cerr << "invalid file: " << file_path << std::endl;
continue;
}
auto upload_result = upload_minio(
"vehicle_image_side/" + record.vehicle_image_side, file_path);
if (EXIT_FAILURE == upload_result) {
std::cerr << "Failed to upload image for record id=" << record.object_id
<< std::endl;
continue;
}
file_path =
"./temp/" +
std::filesystem::path(record.vehicle_image_front).filename().string();
if (!std::filesystem::exists(file_path) ||
std::filesystem::is_directory(file_path)) {
std::cerr << "invalid file: " << file_path << std::endl;
continue;
}
upload_result = upload_minio(
"vehicle_image_front/" + record.vehicle_image_front, file_path);
if (EXIT_FAILURE == upload_result) {
std::cerr << "Failed to upload image for record id=" << record.object_id
<< std::endl;
continue;
}
// 2. HTTP 请求服务器
AnalyticsClient client("https://ai.ronsunny.cn:8090/api/public/"
"sentinel-record-analytics");
std::string err;
std::string id = generateUUID();
record.id = id;
nlohmann::json j;
j["Id"] = id;
j["DeviceId"] = GetDeviceID();
j["VehicleType"] = record.vehicle_type;
j["vehicleImageFront"] = record.vehicle_image_front;
j["vehicleImageSide"] = record.vehicle_image_side;
if (!client.call(j, err)) {
std::cerr << "Analytics request failed: " << err << "\n";
}
// 删除本地文件和记录
TrackKey key{record.source_id, record.object_id};
g_store.remove(key);
}
}