98 lines
2.9 KiB
Python
98 lines
2.9 KiB
Python
import pika
|
|
import uuid
|
|
import time
|
|
import os
|
|
import urllib.parse
|
|
from minio import Minio
|
|
import os
|
|
from sca import sca
|
|
|
|
|
|
# MinIO 配置
|
|
MINIO_HOST = "ai.ronsunny.cn:9000"
|
|
MINIO_ACCESS_KEY = "minioadmin"
|
|
MINIO_SECRET_KEY = "minioadmin"
|
|
MINIO_SECURE = True
|
|
BUCKET_NAME = "video-sca"
|
|
TEMP_DIR = "tmp/"
|
|
|
|
# RabbitMQ 配置
|
|
RABBITMQ_HOST = "10.10.12.101"
|
|
RABBITMQ_PORT = 5672
|
|
RABBITMQ_VHOST = "bbit_ai"
|
|
RABBITMQ_USER = "ai_lab_iva_sca"
|
|
RABBITMQ_PASS = "123456"
|
|
RABBITMQ_QUEUE = "/sca_queue"
|
|
|
|
# 初始化 MinIO 客户端
|
|
minio_client = Minio(
|
|
MINIO_HOST,
|
|
access_key=MINIO_ACCESS_KEY,
|
|
secret_key=MINIO_SECRET_KEY,
|
|
secure=MINIO_SECURE
|
|
)
|
|
|
|
# ------------------- 消息处理 -------------------
|
|
def callback(ch, method, properties, body):
|
|
import json
|
|
data = json.loads(body)
|
|
|
|
for record in data.get("Records", []):
|
|
# 解析桶名和文件 Key
|
|
bucket = record["s3"]["bucket"]["name"]
|
|
key_encoded = record["s3"]["object"]["key"]
|
|
key = urllib.parse.unquote(key_encoded)
|
|
filename = os.path.basename(key)
|
|
temp_file_path = os.path.join(TEMP_DIR, filename)
|
|
|
|
print(f"[1] 下载文件 {key} 到临时目录 {temp_file_path}")
|
|
minio_client.fget_object(bucket, key, temp_file_path)
|
|
|
|
# AI 分析耗时
|
|
print(f"[2] AI 分析...")
|
|
# 生成分析后文件 UUID 名
|
|
new_uuid = str(uuid.uuid4())
|
|
new_filename = f"{new_uuid}"
|
|
new_file_path = os.path.join(TEMP_DIR, new_filename)
|
|
|
|
# 转为绝对路径
|
|
temp_file_path = os.path.abspath(temp_file_path)
|
|
new_file_path = os.path.abspath(new_file_path)
|
|
|
|
# 这里暂时复制同一个视频模拟分析结果
|
|
sca(["file://" + temp_file_path], new_file_path)
|
|
print(f"[3] AI 分析完成,生成文件 {new_file_path}")
|
|
|
|
# 上传分析后视频到 video-sca/ai/
|
|
dest_key = f"ai/{new_filename}"
|
|
minio_client.fput_object(bucket, dest_key, new_file_path)
|
|
print(f"[4] 上传分析视频到 MinIO {dest_key}")
|
|
|
|
# 删除临时文件
|
|
os.remove(temp_file_path)
|
|
os.remove(new_file_path)
|
|
print("[5] 临时文件已删除\n")
|
|
print("[6] 本次任务完成\n")
|
|
|
|
# 确认消息
|
|
ch.basic_ack(delivery_tag=method.delivery_tag)
|
|
# 连接 RabbitMQ
|
|
# 设置凭证
|
|
credentials = pika.PlainCredentials(RABBITMQ_USER, RABBITMQ_PASS)
|
|
connection = pika.BlockingConnection(
|
|
pika.ConnectionParameters(
|
|
host=RABBITMQ_HOST,
|
|
port=RABBITMQ_PORT,
|
|
virtual_host=RABBITMQ_VHOST,
|
|
credentials=credentials
|
|
)
|
|
)
|
|
channel = connection.channel()
|
|
channel.queue_declare(queue=RABBITMQ_QUEUE, durable=True)
|
|
|
|
channel.basic_qos(prefetch_count=1) # 一次只处理一个文件
|
|
channel.basic_consume(queue=RABBITMQ_QUEUE, on_message_callback=callback)
|
|
|
|
print("Waiting for messages...")
|
|
channel.start_consuming()
|