import json from config.minIO import get_temp_url from config.pgDb import pg_pool from utils import MyUtils def get_sca_video_list(name, page=1, page_size=10): """ 获取用户已分析视频列表,带分页 """ offset = (page - 1) * page_size with pg_pool.getConn() as conn: with conn.cursor() as cursor: # 1️⃣ 查询总条数 cursor.execute( """ SELECT COUNT(*) FROM video_sca WHERE (%s = '' OR name LIKE '%%' || %s || '%%') """, (name, name), ) total = cursor.fetchone()[0] # 2️⃣ 查询当前页数据 cursor.execute( """ SELECT id, name, raw_object_name, ai_object_name, duration, size, video_codec, audio_codec, overall_bit_rate, resolution, sc_analysis_time, sc_analysis_total_count, sc_analysis_max_count, sc_analysis_primary_type, sc_analysis_secondary_type, other_info, created_at FROM video_sca WHERE (%s = '' OR name LIKE '%%' || %s || '%%') ORDER BY created_at DESC LIMIT %s OFFSET %s """, (name, name, page_size, offset), ) rows = cursor.fetchall() result = [] for row in rows: result.append( { "id": row[0], "name": row[1], "raw_video_url": get_temp_url("video-sca", "raw/" + row[2]), "ai_video_url": get_temp_url("video-sca", "ai/" + row[3]), "duration": MyUtils.safe_round(row[4], 2), "size_kb": MyUtils.safe_round(row[5] / 1024, 2), "video_codec": row[6], "audio_codec": row[7], "overall_bit_rate": row[8], "resolution": row[9], "sc_analysis_time": MyUtils.safe_round(row[10], 2), "sc_analysis_total_count": row[11], "sc_analysis_max_count": row[12], "sc_analysis_primary_type": row[13], "sc_analysis_secondary_type": row[14], "other_info": json.loads(row[15]), "created_at": MyUtils.format_datetime(row[16]), } ) return total, result def get_video_sca_details(v_id): """ 获取指定视频的分析明细列表 """ with pg_pool.getConn() as conn: with conn.cursor() as cursor: cursor.execute( """ SELECT id, v_id, time_stamp, other_info FROM video_sca_details WHERE v_id = %s ORDER BY time_stamp ASC """, (v_id,), ) rows = cursor.fetchall() result = [] for row in rows: # other_info 从 JSON 字符串解析回字典 result.append( { "id": row[0], "v_id": row[1], "time_stamp": row[2], "other_info": row[3], } ) return result