from typing import List, Dict from langchain_postgres import PostgresChatMessageHistory from config.pgDb import pg_pool # ————————————————————————————————————————————————————AI角色——————————————————————————————— database_name = "ai_chat_history" def get_ai_personality(ai_id: str): with pg_pool.getConn() as conn: with conn.cursor() as cur: cur.execute( "SELECT ai_personality FROM ai_chat_profiles WHERE id = %s", (ai_id,) ) row = cur.fetchone() if row: return row[0] else: return "你是一个乐于助人的AI助手,请保持中文简洁回答用户。" def get_description(ai_id: str): with pg_pool.getConn() as conn: with conn.cursor() as cur: cur.execute( "SELECT description FROM ai_chat_profiles WHERE id = %s", (ai_id,) ) row = cur.fetchone() if row: return row[0] else: return "你是一个乐于助人的AI助手,请保持中文简洁回答用户。" def get_ai_available_kn_bases(ai_id: str) -> List[str]: with pg_pool.getConn() as conn: result = conn.execute( "SELECT available_kn_bases FROM ai_chat_profiles WHERE id = %s", (ai_id,) ) return result.fetchone()[0] def get_all_ai_bot(user_id: str, module: str) -> List[Dict]: with pg_pool.getConn() as conn: with conn.cursor() as cur: # 查询用户角色 cur.execute("SELECT roles FROM users WHERE id = %s", (user_id,)) role_row = cur.fetchone() if not role_row: return [] # 用户不存在 user_roles = role_row[0] # 查询 AI 角色 JSON 字段包含用户角色 cur.execute( """ SELECT id, title, description, welcome_words, ai_personality, available_report_tables, available_kn_bases FROM ai_chat_profiles WHERE available_module = %s AND is_active = TRUE AND available_roles::jsonb ?| %s """, (module, user_roles), ) rows = cur.fetchall() result = [] for row in rows: # row 索引对应 SELECT 字段顺序 ( id_, title, description, welcome_words, ai_personality, available_report_tables, available_kn_bases, ) = row # 解析 JSON roles_json = ai_personality if ai_personality else {} result.append( { "id": id_, "title": title, "description": description, "welcome_words": welcome_words, "name": roles_json.get("名字", ""), "role": roles_json.get("性格", ""), "service": roles_json.get("业务", ""), "available_report_tables": available_report_tables, "available_kn_bases": available_kn_bases, } ) return result # ————————————————————————————————————————————————————消息——————————————————————————————— def insert_message(session_id: str, isAI: bool, content: str): with pg_pool.getConn() as conn: history = PostgresChatMessageHistory( database_name, session_id, sync_connection=conn ) if isAI: history.add_ai_message(content) else: history.add_user_message(content) def get_history(session_id: str): simplified = [] with pg_pool.getConn() as conn: history = PostgresChatMessageHistory( database_name, session_id, sync_connection=conn ) for msg in history.messages: simplified.append({"type": msg.type, "content": msg.content}) return simplified def get_history_with_time(session_id: str, number: int): simplified = [] with pg_pool.getConn() as conn: with conn.cursor() as cur: cur.execute( f"SELECT message, created_at FROM ai_chat_history WHERE session_id = '{session_id}' ORDER BY created_at DESC LIMIT {number}" ) rows = cur.fetchall() simplified = [] for row in rows: msg_dict = row[0] simplified.append( { "type": msg_dict.get("type"), "created_at": row[1].isoformat(), "content": msg_dict.get("data", {}).get("content"), } ) return simplified # ————————————————————————————————————————————————————会话——————————————————————————————— def insert_session( user_id: str, ai_id: str, session_id: str, session_title: str, available_module ): with pg_pool.getConn() as coon: with coon.cursor() as cur: cur.execute( "INSERT INTO ai_chat_sessions (id ,user_id, ai_id, title, available_module, created_at, updated_at) VALUES (%s, %s, %s, %s,%s, NOW(), NOW())", (session_id, user_id, ai_id, session_title, available_module), ) coon.commit() def update_session_updated_at(session_id: str): with pg_pool.getConn() as conn: with conn.cursor() as cur: cur.execute( "UPDATE ai_chat_sessions SET updated_at = NOW() WHERE id = %s", (session_id,), ) conn.commit() def get_sessions(user_id: str, available_module: str): with pg_pool.getConn() as conn: with conn.cursor() as cur: cur.execute( "SELECT id, title, updated_at " "FROM ai_chat_sessions " "WHERE user_id = %s AND available_module = %s " "ORDER BY updated_at DESC", (user_id, available_module), ) sessions = cur.fetchall() return [ {"id": row[0], "title": row[1], "updated_at": row[2]} for row in sessions ] # ————————————————————————————————————————————————————报表——————————————————————————————— def get_reports(user_id: str): with pg_pool.getConn() as conn: with conn.cursor() as cur: cur.execute( "SELECT id, title FROM ai_reports WHERE created_by = %s AND is_masked = TRUE ORDER BY created_at DESC", (user_id,), ) reports = cur.fetchall() return [{"id": row[0], "title": row[1]} for row in reports] def save_report(id: str, user_id: str, title: str, sql: str): with pg_pool.getConn() as conn: with conn.cursor() as cur: cur.execute( "INSERT INTO ai_reports (id, title, sql, created_at, created_by , is_masked) VALUES (%s, %s, %s, NOW(), %s, FALSE) RETURNING id", (id, title, sql, user_id), ) report_id = cur.fetchone()[0] conn.commit() return report_id def maked_report(report_id: str, title: str): with pg_pool.getConn() as conn: with conn.cursor() as cur: cur.execute( "UPDATE ai_reports SET title = %s, is_masked = TRUE WHERE id = %s", (title, report_id), ) conn.commit() def getSQL(reportId: str): with pg_pool.getConn() as conn: with conn.cursor() as cur: cur.execute("SELECT sql FROM ai_reports WHERE id = %s", (reportId,)) row = cur.fetchone() if row: return row[0] else: return "" def get_available_tables_str(aiId: str): with pg_pool.getConn() as conn: with conn.cursor() as cur: # 1. 先取 AI 可用的数据库表 cur.execute( "SELECT available_report_tables FROM ai_chat_profiles WHERE id = %s", (aiId,), ) role_row = cur.fetchone() if not role_row: return "无数据库表可用" available_tables = role_row[0] # 假设是列表 if not available_tables: return "无数据库表可用" # 2. 构造 IN 查询占位符 placeholders = ",".join(["%s"] * len(available_tables)) sql_query = f""" SELECT id, name, description FROM ai_reports_tables WHERE id IN ({placeholders}) AND is_active = TRUE """ cur.execute(sql_query, available_tables) tableIds = cur.fetchall() # 3. 查询这些表的字段 result = "" for table in tableIds: cur.execute( "SELECT name, type, description FROM ai_reports_fields WHERE table_id = %s AND is_active = TRUE", (table[0],), ) columns = cur.fetchall() result += f"{table[1]}:{table[2]}\n" result += "字段名,数据类型,描述\n" for column in columns: result += f"{column[0]},{column[1]}, {column[2]}\n" result += "\n" return result # -------------------报表数据源------------------ # 获取表 def get_available_tables(): with pg_pool.getConn() as conn: with conn.cursor() as cursor: cursor.execute( "SELECT id, name, description,is_active FROM ai_reports_tables", ) return [ { "id": row[0], "name": row[1], "description": row[2], "is_active": row[3], } for row in cursor.fetchall() ] # 新增表 def add_table(name, description, user_id): with pg_pool.getConn() as conn: with conn.cursor() as cursor: cursor.execute( """ INSERT INTO ai_reports_tables (name, description, create_by) VALUES (%s, %s, %s) RETURNING id """, (name, description, user_id), ) new_id = cursor.fetchone()[0] # 取返回的 id return new_id # 获取字段 def get_fields_by_table_id(table_id): with pg_pool.getConn() as conn: with conn.cursor() as cursor: cursor.execute( "SELECT id, name, type, description, is_active FROM ai_reports_fields WHERE table_id = %s", (table_id,), ) return [ { "id": row[0], "name": row[1], "type": row[2], "description": row[3], "is_active": row[4], } for row in cursor.fetchall() ] # 新增字段 def add_field(name, type, description, is_active, table_id, user_id): with pg_pool.getConn() as conn: with conn.cursor() as cursor: cursor.execute( "INSERT INTO ai_reports_fields (name,type,description, is_active, create_by, table_id) VALUES (%s, %s, %s, %s, %s, %s) RETURNING id", (name, type, description, is_active, user_id, table_id), ) new_id = cursor.fetchone()[0] # 取返回的 id return new_id # 新增报表智能体 def insert_bot( title: str, description: str, welcome_words: str, ai_personality: str, available_module: str, available_report_tables: str, available_kn_bases: str, user_id: str, ): with pg_pool.getConn() as conn: with conn.cursor() as cursor: available_roles = json.dumps(["user"]) cursor.execute( """ INSERT INTO ai_chat_profiles (available_module,available_roles, title, description, welcome_words, ai_personality, available_report_tables, available_kn_bases, created_by, created_at) VALUES (%s, %s, %s, %s, %s, %s, %s, %s, %s, now()) RETURNING id """, ( available_module, available_roles, title, description, welcome_words, ai_personality, available_report_tables, available_kn_bases, user_id, ), ) report_id = cursor.fetchone()[0] return report_id # 更新报表智能体 def update_bot( id: str, title: str, description: str, welcome_words: str, ai_personality: str, available_module: str, available_report_tables: str, available_kn_bases: str, user_id: str, ): with pg_pool.getConn() as conn: with conn.cursor() as cursor: cursor.execute( """ UPDATE ai_chat_profiles SET title = %s, description = %s, ai_personality = %s, welcome_words = %s, available_report_tables = %s, available_kn_bases = %s, available_module = %s, updated_at = NOW(), updated_by = %s WHERE id = %s """, ( title, description, ai_personality, welcome_words, available_report_tables, available_kn_bases, available_module, user_id, id, ), ) # ————————————————————————————————————————————————————知识库——————————————————————————————— def get_available_knowledge_bases(available_module: str): with pg_pool.getConn() as conn: with conn.cursor() as cursor: cursor.execute( """ SELECT id, name, description, is_active FROM ai_knowledge WHERE available_module::jsonb @> %s::jsonb """, (f'["{available_module}"]',), ) return [ { "id": row[0], "name": row[1], "description": row[2], "is_active": row[3], } for row in cursor.fetchall() ] def add_knowledge_base(name, description, user_id): with pg_pool.getConn() as conn: with conn.cursor() as cursor: cursor.execute( """ INSERT INTO ai_knowledge (name, description, created_by, created_at) VALUES (%s, %s, %s, now()) RETURNING id """, (name, description, user_id), ) new_id = cursor.fetchone()[0] # 取返回的 id return new_id # ————————————————————————————————————————————————————仪评指标联识别——————————————————————————————— from config.minIO import get_temp_url import utils.MyUtils as MyUtils def get_ticket_image_list(user_id): with pg_pool.getConn() as conn: with conn.cursor() as cursor: cursor.execute( """ SELECT created_at, file_name, resolution, size, name, moisture_content, cocoon_weight, defective_pupa_count, fresh_shell_weight, sample_count, barcode, oss, net_weight_total, evaluator, reviewer,id ,dead_pupa_count FROM ticket_images WHERE created_by = %s """, (user_id,), ) rows = cursor.fetchall() result = [] for row in rows: result.append( { "created_at": MyUtils.format_datetime(row[0]), "file_name": row[1], "resolution": row[2], "size": round(row[3] / 1024, 2), "name": row[4], "moisture_content": row[5], "cocoon_weight": row[6], "defective_pupa_count": row[7], "fresh_shell_weight": row[8], "sample_count": row[9], "barcode": row[10], "oss_url": get_temp_url("image-ticket", row[11]), "net_weight_total": row[12], "evaluator": row[13], "reviewer": row[14], "id": row[15], "dead_pupa_count": row[16], } ) return result def insert_ticket_image( created_by, file_name, resolution, size, name, moisture_content, cocoon_weight, defective_pupa_count, dead_pupa_count, fresh_shell_weight, sample_count, barcode, oss, net_weight_total, evaluator, reviewer, ): with pg_pool.getConn() as conn: with conn.cursor() as cursor: cursor.execute( """ INSERT INTO ticket_images ( created_by, file_name, resolution, size, name, moisture_content, cocoon_weight, defective_pupa_count, dead_pupa_count, fresh_shell_weight, sample_count, barcode, oss, net_weight_total, evaluator, reviewer, created_at ) VALUES (%s, %s, %s, %s, %s, %s, %s, %s, %s, %s, %s, %s, %s, %s, %s, %s, NOW()) RETURNING id """, ( created_by, file_name, resolution, size, name, moisture_content, cocoon_weight, defective_pupa_count, dead_pupa_count, fresh_shell_weight, sample_count, barcode, oss, net_weight_total, evaluator, reviewer, ), ) new_id = cursor.fetchone()[0] conn.commit() return new_id import json # ————————————————————————————————————————————————————证件照片识别——————————————————————————————— def insert_license_image( created_by, file_name, resolution, size, name, oss, type, content ): with pg_pool.getConn() as conn: with conn.cursor() as cursor: cursor.execute( """ INSERT INTO license_images ( created_by, created_at, file_name, resolution, size, name, oss, type, content ) VALUES (%s, NOW(), %s, %s, %s, %s, %s, %s, %s ) RETURNING id """, (created_by, file_name, resolution, size, name, oss, type, content), ) new_id = cursor.fetchone()[0] conn.commit() return new_id def get_license_image_list(user_id, page=1, page_size=10): """ 获取用户已分析图片列表,带分页 """ offset = (page - 1) * page_size with pg_pool.getConn() as conn: with conn.cursor() as cursor: # 1️⃣ 查询总条数 cursor.execute( """ SELECT COUNT(*) FROM license_images WHERE created_by = %s """, (user_id,), ) total = cursor.fetchone()[0] # 2️⃣ 查询当前页数据 cursor.execute( """ SELECT created_at, file_name, resolution, size, name, oss, id, type, content FROM license_images WHERE created_by = %s ORDER BY created_at DESC LIMIT %s OFFSET %s """, (user_id, page_size, offset), ) rows = cursor.fetchall() result = [] for row in rows: result.append( { "created_at": MyUtils.format_datetime(row[0]), "file_name": row[1], "resolution": row[2], "size": round(row[3] / 1024, 2), "name": row[4], "oss_url": get_temp_url("image-license", row[5]), "id": row[6], "type": row[7], "content": row[8], } ) return total, result # ————————————————————————————————————————————————————蚕茧质量识别——————————————————————————————— def insert_sca_image( file_name, resolution, size, cocoon_count, max_confidence, min_confidence, average_confidence, other_info, preprocess_time_ms, inference_time_ms, postprocess_time_ms, name, image_pre, image_after, created_by, ): with pg_pool.getConn() as conn: with conn.cursor() as cursor: other_info = json.dumps(other_info) cursor.execute( """ INSERT INTO sca_images ( upload_datetime, file_name, resolution, size, cocoon_count, max_confidence, min_confidence, average_confidence, other_info, preprocess_time_ms, inference_time_ms, postprocess_time_ms, name, image_pre, image_after, created_by ) VALUES (NOW(), %s, %s, %s, %s, %s, %s, %s, %s, %s, %s, %s, %s, %s, %s, %s ) RETURNING id """, ( file_name, resolution, size, cocoon_count, max_confidence, min_confidence, average_confidence, other_info, preprocess_time_ms, inference_time_ms, postprocess_time_ms, name, image_pre, image_after, created_by, ), ) new_id = cursor.fetchone()[0] conn.commit() return new_id def get_sca_image_list(user_id, name, page=1, page_size=10): """ 获取用户已分析图片列表,带分页 """ offset = (page - 1) * page_size with pg_pool.getConn() as conn: with conn.cursor() as cursor: # 1️⃣ 查询总条数 # ✅ 改进版:支持 name 为空时统计全部,不为空时模糊统计 cursor.execute( """ SELECT COUNT(*) FROM sca_images WHERE created_by = %s AND (%s = '' OR name LIKE '%%' || %s || '%%') """, (user_id, name, name), ) total = cursor.fetchone()[0] # 2️⃣ 查询当前页数据 # ✅ 改进版 cursor.execute( """ SELECT id, name, upload_datetime, file_name, image_pre, image_after, resolution, size, cocoon_count, max_confidence, min_confidence, average_confidence, other_info, preprocess_time_ms, inference_time_ms, postprocess_time_ms FROM sca_images WHERE created_by = %s AND (%s = '' OR name LIKE '%%' || %s || '%%') ORDER BY upload_datetime DESC LIMIT %s OFFSET %s """, (user_id, name, name, page_size, offset), ) rows = cursor.fetchall() result = [] for row in rows: result.append( { "id": row[0], "name": row[1], "upload_datetime": MyUtils.format_datetime(row[2]), "file_name": row[3], "image_pre": get_temp_url("image-sca", "raw/" + row[4]), "image_after": get_temp_url("image-sca", "ai/" + row[5]), "resolution": row[6], "size": MyUtils.safe_round(row[7] / 1024, 2), "cocoon_count": row[8], "max_confidence": row[9], "min_confidence": row[10], "average_confidence": row[11], "other_info": row[12], "preprocess_time_ms": MyUtils.safe_round(row[13], 4), "inference_time_ms": MyUtils.safe_round(row[14], 4), "postprocess_time_ms": MyUtils.safe_round(row[15], 4), } ) return total, result def get_sca_video_list(name, page=1, page_size=10): """ 获取用户已分析视频列表,带分页 """ offset = (page - 1) * page_size with pg_pool.getConn() as conn: with conn.cursor() as cursor: # 1️⃣ 查询总条数 cursor.execute( """ SELECT COUNT(*) FROM sca_videos WHERE (%s = '' OR name LIKE '%%' || %s || '%%') """, (name, name), ) total = cursor.fetchone()[0] # 2️⃣ 查询当前页数据 cursor.execute( """ SELECT id, name, raw_object_name, ai_object_name, duration, size, video_codec, audio_codec, overall_bit_rate, resolution, sc_analysis_time, sc_analysis_total_count, sc_analysis_max_count, sc_analysis_primary_type, sc_analysis_secondary_type, other_info, created_at FROM sca_videos WHERE (%s = '' OR name LIKE '%%' || %s || '%%') ORDER BY created_at DESC LIMIT %s OFFSET %s """, (name, name, page_size, offset), ) rows = cursor.fetchall() result = [] for row in rows: result.append( { "id": row[0], "name": row[1], "raw_video_url": get_temp_url("video-sca", "raw/" + row[2]), "ai_video_url": get_temp_url("video-sca", "ai/" + row[3]), "duration": MyUtils.safe_round(row[4], 2), "size_kb": MyUtils.safe_round(row[5] / 1024, 2), "video_codec": row[6], "audio_codec": row[7], "overall_bit_rate": row[8], "resolution": row[9], "sc_analysis_time": MyUtils.safe_round(row[10], 2), "sc_analysis_total_count": row[11], "sc_analysis_max_count": row[12], "sc_analysis_primary_type": row[13], "sc_analysis_secondary_type": row[14], "other_info": json.loads(row[15]), "created_at": MyUtils.format_datetime(row[16]), } ) return total, result def get_sca_video_details(v_id): """ 获取指定视频的分析明细列表 """ with pg_pool.getConn() as conn: with conn.cursor() as cursor: cursor.execute( """ SELECT id, v_id, time_stamp, other_info FROM sca_video_details WHERE v_id = %s ORDER BY time_stamp ASC """, (v_id,), ) rows = cursor.fetchall() result = [] for row in rows: # other_info 从 JSON 字符串解析回字典 result.append( { "id": row[0], "v_id": row[1], "time_stamp": row[2], "other_info": row[3], } ) return result