from langchain_postgres import PostgresChatMessageHistory from config.pgDb import pg_pool from config.ssDb import mssql_pool from typing import List, Dict import json # ————————————————————————————————————————————————————AI角色——————————————————————————————— database_name = "ai_chat_history" def get_ai_personality(ai_id: str): with pg_pool.getConn() as conn: with conn.cursor() as cur: cur.execute( "SELECT ai_personality FROM ai_chat_profiles WHERE id = %s", (ai_id,) ) row = cur.fetchone() if row: return row[0] else: return "你是一个乐于助人的AI助手,请保持中文简洁回答用户。" def get_description(ai_id: str): with pg_pool.getConn() as conn: with conn.cursor() as cur: cur.execute( "SELECT description FROM ai_chat_profiles WHERE id = %s", (ai_id,) ) row = cur.fetchone() if row: return row[0] else: return "你是一个乐于助人的AI助手,请保持中文简洁回答用户。" def get_ai_available_kn_bases(ai_id: str) -> List[str]: with pg_pool.getConn() as conn: result = conn.execute( "SELECT available_kn_bases FROM ai_chat_profiles WHERE id = %s", (ai_id,) ) return result.fetchone()[0] def get_all_ai_bot(user_id: str, module: str) -> List[Dict]: with pg_pool.getConn() as conn: with conn.cursor() as cur: # 查询用户角色 cur.execute( "SELECT roles FROM users WHERE id = %s", (user_id,) ) role_row = cur.fetchone() if not role_row: return [] # 用户不存在 user_roles = role_row[0] # 查询 AI 角色 JSON 字段包含用户角色 cur.execute( """ SELECT id, title, description, welcome_words, ai_personality, available_report_tables, available_kn_bases FROM ai_chat_profiles WHERE available_module = %s AND is_active = TRUE AND available_roles::jsonb ?| %s """, (module, user_roles) ) rows = cur.fetchall() result = [] for row in rows: # row 索引对应 SELECT 字段顺序 id_, title, description, welcome_words, ai_personality, available_report_tables, available_kn_bases = row # 解析 JSON roles_json = ai_personality if ai_personality else {} result.append({ "id": id_, "title": title, "description": description, "welcome_words": welcome_words, "name": roles_json.get("名字", ""), "role": roles_json.get("性格", ""), "service": roles_json.get("业务", ""), "available_report_tables": available_report_tables, "available_kn_bases": available_kn_bases }) return result # ————————————————————————————————————————————————————消息——————————————————————————————— def insert_message(session_id: str, isAI: bool, content: str): with pg_pool.getConn() as conn: history = PostgresChatMessageHistory( database_name, session_id, sync_connection=conn ) if isAI: history.add_ai_message(content) else: history.add_user_message(content) def get_history(session_id: str): simplified = [] with pg_pool.getConn() as conn: history = PostgresChatMessageHistory( database_name, session_id, sync_connection=conn ) for msg in history.messages: simplified.append({ "type": msg.type, "content": msg.content }) return simplified def get_history_with_time(session_id: str, number: int): simplified = [] with pg_pool.getConn() as conn: with conn.cursor() as cur: cur.execute( f"SELECT message, created_at FROM ai_chat_history WHERE session_id = '{session_id}' ORDER BY created_at DESC LIMIT {number}") rows = cur.fetchall() simplified = [] for row in rows: msg_dict = row[0] simplified.append({ "type": msg_dict.get("type"), "created_at": row[1].isoformat(), "content": msg_dict.get("data", {}).get("content") }) return simplified # ————————————————————————————————————————————————————会话——————————————————————————————— def insert_session(user_id: str, ai_id: str, session_id: str, session_title: str, available_module): with pg_pool.getConn() as coon: with coon.cursor() as cur: cur.execute( "INSERT INTO ai_chat_sessions (id ,user_id, ai_id, title, available_module, created_at, updated_at) VALUES (%s, %s, %s, %s,%s, NOW(), NOW())", (session_id, user_id, ai_id, session_title, available_module) ) coon.commit() def update_session_updated_at(session_id: str): with pg_pool.getConn() as conn: with conn.cursor() as cur: cur.execute( "UPDATE ai_chat_sessions SET updated_at = NOW() WHERE id = %s", (session_id,) ) conn.commit() def get_sessions(user_id: str, available_module: str): with pg_pool.getConn() as conn: with conn.cursor() as cur: cur.execute( "SELECT id, title, updated_at " "FROM ai_chat_sessions " "WHERE user_id = %s AND available_module = %s " "ORDER BY updated_at DESC", (user_id, available_module) ) sessions = cur.fetchall() return [ { "id": row[0], "title": row[1], "updated_at": row[2] } for row in sessions ] # ————————————————————————————————————————————————————报表——————————————————————————————— def get_reports(user_id: str): with pg_pool.getConn() as conn: with conn.cursor() as cur: cur.execute( "SELECT id, title FROM ai_reports WHERE created_by = %s AND is_masked = TRUE ORDER BY created_at DESC", (user_id,) ) reports = cur.fetchall() return [ { "id": row[0], "title": row[1] } for row in reports ] def save_report(id: str, user_id: str, title: str, sql: str): with pg_pool.getConn() as conn: with conn.cursor() as cur: cur.execute( "INSERT INTO ai_reports (id, title, sql, created_at, created_by , is_masked) VALUES (%s, %s, %s, NOW(), %s, FALSE) RETURNING id", (id, title, sql, user_id) ) report_id = cur.fetchone()[0] conn.commit() return report_id def maked_report(report_id: str, title: str): with pg_pool.getConn() as conn: with conn.cursor() as cur: cur.execute( "UPDATE ai_reports SET title = %s, is_masked = TRUE WHERE id = %s", (title, report_id) ) conn.commit() def getSQL(reportId: str): with pg_pool.getConn() as conn: with conn.cursor() as cur: cur.execute( "SELECT sql FROM ai_reports WHERE id = %s", (reportId,) ) row = cur.fetchone() if row: return row[0] else: return "" def get_available_tables_str(aiId: str): with pg_pool.getConn() as conn: with conn.cursor() as cur: # 1. 先取 AI 可用的数据库表 cur.execute( "SELECT available_report_tables FROM ai_chat_profiles WHERE id = %s", (aiId,) ) role_row = cur.fetchone() if not role_row: return "无数据库表可用" available_tables = role_row[0] # 假设是列表 if not available_tables: return "无数据库表可用" # 2. 构造 IN 查询占位符 placeholders = ','.join(['%s'] * len(available_tables)) sql_query = f""" SELECT id, name, description FROM ai_reports_tables WHERE id IN ({placeholders}) AND is_active = TRUE """ cur.execute(sql_query, available_tables) tableIds = cur.fetchall() # 3. 查询这些表的字段 result = "" for table in tableIds: cur.execute( "SELECT name, type, description FROM ai_reports_fields WHERE table_id = %s AND is_active = TRUE", (table[0],) ) columns = cur.fetchall() result += f"{table[1]}:{table[2]}\n" result += "字段名,数据类型,描述\n" for column in columns: result += f"{column[0]},{column[1]}, {column[2]}\n" result += "\n" return result # -------------------报表数据源------------------ # 获取表 def get_available_tables(): with pg_pool.getConn() as conn: with conn.cursor() as cursor: cursor.execute( "SELECT id, name, description,is_active FROM ai_reports_tables", ) return [{"id": row[0], "name": row[1], "description": row[2], "is_active": row[3]} for row in cursor.fetchall()] # 新增表 def add_table(name, description, user_id): with pg_pool.getConn() as conn: with conn.cursor() as cursor: cursor.execute( """ INSERT INTO ai_reports_tables (name, description, create_by) VALUES (%s, %s, %s) RETURNING id """, (name, description, user_id) ) new_id = cursor.fetchone()[0] # 取返回的 id return new_id # 获取字段 def get_fields_by_table_id(table_id): with pg_pool.getConn() as conn: with conn.cursor() as cursor: cursor.execute( "SELECT id, name, type, description, is_active FROM ai_reports_fields WHERE table_id = %s", (table_id,), ) return [{"id": row[0], "name": row[1], "type": row[2], "description": row[3], "is_active": row[4]} for row in cursor.fetchall()] # 新增字段 def add_field(name, type, description, is_active, table_id, user_id): with pg_pool.getConn() as conn: with conn.cursor() as cursor: cursor.execute( "INSERT INTO ai_reports_fields (name,type,description, is_active, create_by, table_id) VALUES (%s, %s, %s, %s, %s, %s) RETURNING id", (name, type, description, is_active, user_id, table_id) ) new_id = cursor.fetchone()[0] # 取返回的 id return new_id # 新增报表智能体 def insert_bot(title: str, description: str, welcome_words: str, ai_personality: str, available_module: str, available_report_tables: str, available_kn_bases: str, user_id: str): with pg_pool.getConn() as conn: with conn.cursor() as cursor: available_roles = json.dumps(['user']) cursor.execute( """ INSERT INTO ai_chat_profiles (available_module,available_roles, title, description, welcome_words, ai_personality, available_report_tables, available_kn_bases, created_by, created_at) VALUES (%s, %s, %s, %s, %s, %s, %s, %s, %s, now()) RETURNING id """, (available_module, available_roles, title, description, welcome_words, ai_personality, available_report_tables, available_kn_bases, user_id) ) report_id = cursor.fetchone()[0] return report_id # 更新报表智能体 def update_bot(id: str, title: str, description: str, welcome_words: str, ai_personality: str, available_module: str, available_report_tables: str, available_kn_bases: str, user_id: str): with pg_pool.getConn() as conn: with conn.cursor() as cursor: cursor.execute(""" UPDATE ai_chat_profiles SET title = %s, description = %s, ai_personality = %s, welcome_words = %s, available_report_tables = %s, available_kn_bases = %s, available_module = %s, updated_at = NOW(), updated_by = %s WHERE id = %s """, (title, description, ai_personality, welcome_words, available_report_tables, available_kn_bases, available_module, user_id, id) ) # ————————————————————————————————————————————————————知识库——————————————————————————————— def get_available_knowledge_bases(available_module: str): with pg_pool.getConn() as conn: with conn.cursor() as cursor: cursor.execute( """ SELECT id, name, description, is_active FROM ai_knowledge WHERE available_module::jsonb @> %s::jsonb """, (f'["{available_module}"]',) ) return [{"id": row[0], "name": row[1], "description": row[2], "is_active": row[3]} for row in cursor.fetchall()] def add_knowledge_base(name, description, user_id): with pg_pool.getConn() as conn: with conn.cursor() as cursor: cursor.execute( """ INSERT INTO ai_knowledge (name, description, created_by, created_at) VALUES (%s, %s, %s, now()) RETURNING id """, (name, description, user_id) ) new_id = cursor.fetchone()[0] # 取返回的 id