Files

98 lines
2.9 KiB
Python

import pika
import uuid
import time
import os
import urllib.parse
from minio import Minio
import os
from sca import sca
# MinIO 配置
MINIO_HOST = "ai.ronsunny.cn:9000"
MINIO_ACCESS_KEY = "minioadmin"
MINIO_SECRET_KEY = "minioadmin"
MINIO_SECURE = True
BUCKET_NAME = "video-sca"
TEMP_DIR = "tmp/"
# RabbitMQ 配置
RABBITMQ_HOST = "10.10.12.101"
RABBITMQ_PORT = 5672
RABBITMQ_VHOST = "bbit_ai"
RABBITMQ_USER = "ai_lab_iva_sca"
RABBITMQ_PASS = "123456"
RABBITMQ_QUEUE = "/sca_queue"
# 初始化 MinIO 客户端
minio_client = Minio(
MINIO_HOST,
access_key=MINIO_ACCESS_KEY,
secret_key=MINIO_SECRET_KEY,
secure=MINIO_SECURE
)
# ------------------- 消息处理 -------------------
def callback(ch, method, properties, body):
import json
data = json.loads(body)
for record in data.get("Records", []):
# 解析桶名和文件 Key
bucket = record["s3"]["bucket"]["name"]
key_encoded = record["s3"]["object"]["key"]
key = urllib.parse.unquote(key_encoded)
filename = os.path.basename(key)
temp_file_path = os.path.join(TEMP_DIR, filename)
print(f"[1] 下载文件 {key} 到临时目录 {temp_file_path}")
minio_client.fget_object(bucket, key, temp_file_path)
# AI 分析耗时
print(f"[2] AI 分析...")
# 生成分析后文件 UUID 名
new_uuid = str(uuid.uuid4())
new_filename = f"{new_uuid}"
new_file_path = os.path.join(TEMP_DIR, new_filename)
# 转为绝对路径
temp_file_path = os.path.abspath(temp_file_path)
new_file_path = os.path.abspath(new_file_path)
# 这里暂时复制同一个视频模拟分析结果
sca(["file://" + temp_file_path], new_file_path)
print(f"[3] AI 分析完成,生成文件 {new_file_path}")
# 上传分析后视频到 video-sca/ai/
dest_key = f"ai/{new_filename}"
minio_client.fput_object(bucket, dest_key, new_file_path)
print(f"[4] 上传分析视频到 MinIO {dest_key}")
# 删除临时文件
os.remove(temp_file_path)
os.remove(new_file_path)
print("[5] 临时文件已删除\n")
print("[6] 本次任务完成\n")
# 确认消息
ch.basic_ack(delivery_tag=method.delivery_tag)
# 连接 RabbitMQ
# 设置凭证
credentials = pika.PlainCredentials(RABBITMQ_USER, RABBITMQ_PASS)
connection = pika.BlockingConnection(
pika.ConnectionParameters(
host=RABBITMQ_HOST,
port=RABBITMQ_PORT,
virtual_host=RABBITMQ_VHOST,
credentials=credentials
)
)
channel = connection.channel()
channel.queue_declare(queue=RABBITMQ_QUEUE, durable=True)
channel.basic_qos(prefetch_count=1) # 一次只处理一个文件
channel.basic_consume(queue=RABBITMQ_QUEUE, on_message_callback=callback)
print("Waiting for messages...")
channel.start_consuming()