import pika import uuid import time import os import urllib.parse from minio import Minio import os from sca import sca # MinIO 配置 MINIO_HOST = "ai.ronsunny.cn:9000" MINIO_ACCESS_KEY = "minioadmin" MINIO_SECRET_KEY = "minioadmin" MINIO_SECURE = True BUCKET_NAME = "video-sca" TEMP_DIR = "tmp/" # RabbitMQ 配置 RABBITMQ_HOST = "10.10.12.101" RABBITMQ_PORT = 5672 RABBITMQ_VHOST = "bbit_ai" RABBITMQ_USER = "ai_lab_iva_sca" RABBITMQ_PASS = "123456" RABBITMQ_QUEUE = "/sca_queue" # 初始化 MinIO 客户端 minio_client = Minio( MINIO_HOST, access_key=MINIO_ACCESS_KEY, secret_key=MINIO_SECRET_KEY, secure=MINIO_SECURE ) # ------------------- 消息处理 ------------------- def callback(ch, method, properties, body): import json data = json.loads(body) for record in data.get("Records", []): # 解析桶名和文件 Key bucket = record["s3"]["bucket"]["name"] key_encoded = record["s3"]["object"]["key"] key = urllib.parse.unquote(key_encoded) filename = os.path.basename(key) temp_file_path = os.path.join(TEMP_DIR, filename) print(f"[1] 下载文件 {key} 到临时目录 {temp_file_path}") minio_client.fget_object(bucket, key, temp_file_path) # AI 分析耗时 print(f"[2] AI 分析...") # 生成分析后文件 UUID 名 new_uuid = str(uuid.uuid4()) new_filename = f"{new_uuid}" new_file_path = os.path.join(TEMP_DIR, new_filename) # 转为绝对路径 temp_file_path = os.path.abspath(temp_file_path) new_file_path = os.path.abspath(new_file_path) # 这里暂时复制同一个视频模拟分析结果 sca(["file://" + temp_file_path], new_file_path) print(f"[3] AI 分析完成,生成文件 {new_file_path}") # 上传分析后视频到 video-sca/ai/ dest_key = f"ai/{new_filename}" minio_client.fput_object(bucket, dest_key, new_file_path) print(f"[4] 上传分析视频到 MinIO {dest_key}") # 删除临时文件 os.remove(temp_file_path) os.remove(new_file_path) print("[5] 临时文件已删除\n") print("[6] 本次任务完成\n") # 确认消息 ch.basic_ack(delivery_tag=method.delivery_tag) # 连接 RabbitMQ # 设置凭证 credentials = pika.PlainCredentials(RABBITMQ_USER, RABBITMQ_PASS) connection = pika.BlockingConnection( pika.ConnectionParameters( host=RABBITMQ_HOST, port=RABBITMQ_PORT, virtual_host=RABBITMQ_VHOST, credentials=credentials ) ) channel = connection.channel() channel.queue_declare(queue=RABBITMQ_QUEUE, durable=True) channel.basic_qos(prefetch_count=1) # 一次只处理一个文件 channel.basic_consume(queue=RABBITMQ_QUEUE, on_message_callback=callback) print("Waiting for messages...") channel.start_consuming()