Files
AILab/bbit_ai/app/db/postgres.py
T
2025-10-29 13:53:55 +08:00

638 lines
22 KiB
Python
Raw Blame History

This file contains ambiguous Unicode characters
This file contains Unicode characters that might be confused with other characters. If you think that this is intentional, you can safely ignore this warning. Use the Escape button to reveal them.
from typing import List, Dict
from langchain_postgres import PostgresChatMessageHistory
from config.pgDb import pg_pool
# ————————————————————————————————————————————————————AI角色———————————————————————————————
database_name = "ai_chat_history"
def get_ai_personality(ai_id: str):
with pg_pool.getConn() as conn:
with conn.cursor() as cur:
cur.execute(
"SELECT ai_personality FROM ai_chat_profiles WHERE id = %s", (ai_id,)
)
row = cur.fetchone()
if row:
return row[0]
else:
return "你是一个乐于助人的AI助手,请保持中文简洁回答用户。"
def get_description(ai_id: str):
with pg_pool.getConn() as conn:
with conn.cursor() as cur:
cur.execute(
"SELECT description FROM ai_chat_profiles WHERE id = %s", (ai_id,)
)
row = cur.fetchone()
if row:
return row[0]
else:
return "你是一个乐于助人的AI助手,请保持中文简洁回答用户。"
def get_ai_available_kn_bases(ai_id: str) -> List[str]:
with pg_pool.getConn() as conn:
result = conn.execute(
"SELECT available_kn_bases FROM ai_chat_profiles WHERE id = %s", (ai_id,)
)
return result.fetchone()[0]
def get_all_ai_bot(user_id: str, module: str) -> List[Dict]:
with pg_pool.getConn() as conn:
with conn.cursor() as cur:
# 查询用户角色
cur.execute("SELECT roles FROM users WHERE id = %s", (user_id,))
role_row = cur.fetchone()
if not role_row:
return [] # 用户不存在
user_roles = role_row[0]
# 查询 AI 角色 JSON 字段包含用户角色
cur.execute(
"""
SELECT id, title, description, welcome_words, ai_personality, available_report_tables, available_kn_bases
FROM ai_chat_profiles
WHERE available_module = %s
AND is_active = TRUE
AND available_roles::jsonb ?| %s
""",
(module, user_roles),
)
rows = cur.fetchall()
result = []
for row in rows:
# row 索引对应 SELECT 字段顺序
(
id_,
title,
description,
welcome_words,
ai_personality,
available_report_tables,
available_kn_bases,
) = row
# 解析 JSON
roles_json = ai_personality if ai_personality else {}
result.append(
{
"id": id_,
"title": title,
"description": description,
"welcome_words": welcome_words,
"name": roles_json.get("名字", ""),
"role": roles_json.get("性格", ""),
"service": roles_json.get("业务", ""),
"available_report_tables": available_report_tables,
"available_kn_bases": available_kn_bases,
}
)
return result
# ————————————————————————————————————————————————————消息———————————————————————————————
def insert_message(session_id: str, isAI: bool, content: str):
with pg_pool.getConn() as conn:
history = PostgresChatMessageHistory(
database_name, session_id, sync_connection=conn
)
if isAI:
history.add_ai_message(content)
else:
history.add_user_message(content)
def get_history(session_id: str):
simplified = []
with pg_pool.getConn() as conn:
history = PostgresChatMessageHistory(
database_name, session_id, sync_connection=conn
)
for msg in history.messages:
simplified.append({"type": msg.type, "content": msg.content})
return simplified
def get_history_with_time(session_id: str, number: int):
simplified = []
with pg_pool.getConn() as conn:
with conn.cursor() as cur:
cur.execute(
f"SELECT message, created_at FROM ai_chat_history WHERE session_id = '{session_id}' ORDER BY created_at DESC LIMIT {number}"
)
rows = cur.fetchall()
simplified = []
for row in rows:
msg_dict = row[0]
simplified.append(
{
"type": msg_dict.get("type"),
"created_at": row[1].isoformat(),
"content": msg_dict.get("data", {}).get("content"),
}
)
return simplified
# ————————————————————————————————————————————————————会话———————————————————————————————
def insert_session(
user_id: str, ai_id: str, session_id: str, session_title: str, available_module
):
with pg_pool.getConn() as coon:
with coon.cursor() as cur:
cur.execute(
"INSERT INTO ai_chat_sessions (id ,user_id, ai_id, title, available_module, created_at, updated_at) VALUES (%s, %s, %s, %s,%s, NOW(), NOW())",
(session_id, user_id, ai_id, session_title, available_module),
)
coon.commit()
def update_session_updated_at(session_id: str):
with pg_pool.getConn() as conn:
with conn.cursor() as cur:
cur.execute(
"UPDATE ai_chat_sessions SET updated_at = NOW() WHERE id = %s",
(session_id,),
)
conn.commit()
def get_sessions(user_id: str, available_module: str):
with pg_pool.getConn() as conn:
with conn.cursor() as cur:
cur.execute(
"SELECT id, title, updated_at "
"FROM ai_chat_sessions "
"WHERE user_id = %s AND available_module = %s "
"ORDER BY updated_at DESC",
(user_id, available_module),
)
sessions = cur.fetchall()
return [
{"id": row[0], "title": row[1], "updated_at": row[2]}
for row in sessions
]
# ————————————————————————————————————————————————————报表———————————————————————————————
def get_reports(user_id: str):
with pg_pool.getConn() as conn:
with conn.cursor() as cur:
cur.execute(
"SELECT id, title FROM ai_reports WHERE created_by = %s AND is_masked = TRUE ORDER BY created_at DESC",
(user_id,),
)
reports = cur.fetchall()
return [{"id": row[0], "title": row[1]} for row in reports]
def save_report(id: str, user_id: str, title: str, sql: str):
with pg_pool.getConn() as conn:
with conn.cursor() as cur:
cur.execute(
"INSERT INTO ai_reports (id, title, sql, created_at, created_by , is_masked) VALUES (%s, %s, %s, NOW(), %s, FALSE) RETURNING id",
(id, title, sql, user_id),
)
report_id = cur.fetchone()[0]
conn.commit()
return report_id
def maked_report(report_id: str, title: str):
with pg_pool.getConn() as conn:
with conn.cursor() as cur:
cur.execute(
"UPDATE ai_reports SET title = %s, is_masked = TRUE WHERE id = %s",
(title, report_id),
)
conn.commit()
def getSQL(reportId: str):
with pg_pool.getConn() as conn:
with conn.cursor() as cur:
cur.execute("SELECT sql FROM ai_reports WHERE id = %s", (reportId,))
row = cur.fetchone()
if row:
return row[0]
else:
return ""
def get_available_tables_str(aiId: str):
with pg_pool.getConn() as conn:
with conn.cursor() as cur:
# 1. 先取 AI 可用的数据库表
cur.execute(
"SELECT available_report_tables FROM ai_chat_profiles WHERE id = %s",
(aiId,),
)
role_row = cur.fetchone()
if not role_row:
return "无数据库表可用"
available_tables = role_row[0] # 假设是列表
if not available_tables:
return "无数据库表可用"
# 2. 构造 IN 查询占位符
placeholders = ",".join(["%s"] * len(available_tables))
sql_query = f"""
SELECT id, name, description
FROM ai_reports_tables
WHERE id IN ({placeholders}) AND is_active = TRUE
"""
cur.execute(sql_query, available_tables)
tableIds = cur.fetchall()
# 3. 查询这些表的字段
result = ""
for table in tableIds:
cur.execute(
"SELECT name, type, description FROM ai_reports_fields WHERE table_id = %s AND is_active = TRUE",
(table[0],),
)
columns = cur.fetchall()
result += f"{table[1]}{table[2]}\n"
result += "字段名,数据类型,描述\n"
for column in columns:
result += f"{column[0]},{column[1]}, {column[2]}\n"
result += "\n"
return result
# -------------------报表数据源------------------
# 获取表
def get_available_tables():
with pg_pool.getConn() as conn:
with conn.cursor() as cursor:
cursor.execute(
"SELECT id, name, description,is_active FROM ai_reports_tables",
)
return [
{
"id": row[0],
"name": row[1],
"description": row[2],
"is_active": row[3],
}
for row in cursor.fetchall()
]
# 新增表
def add_table(name, description, user_id):
with pg_pool.getConn() as conn:
with conn.cursor() as cursor:
cursor.execute(
"""
INSERT INTO ai_reports_tables (name, description, create_by)
VALUES (%s, %s, %s)
RETURNING id
""",
(name, description, user_id),
)
new_id = cursor.fetchone()[0] # 取返回的 id
return new_id
# 获取字段
def get_fields_by_table_id(table_id):
with pg_pool.getConn() as conn:
with conn.cursor() as cursor:
cursor.execute(
"SELECT id, name, type, description, is_active FROM ai_reports_fields WHERE table_id = %s",
(table_id,),
)
return [
{
"id": row[0],
"name": row[1],
"type": row[2],
"description": row[3],
"is_active": row[4],
}
for row in cursor.fetchall()
]
# 新增字段
def add_field(name, type, description, is_active, table_id, user_id):
with pg_pool.getConn() as conn:
with conn.cursor() as cursor:
cursor.execute(
"INSERT INTO ai_reports_fields (name,type,description, is_active, create_by, table_id) VALUES (%s, %s, %s, %s, %s, %s) RETURNING id",
(name, type, description, is_active, user_id, table_id),
)
new_id = cursor.fetchone()[0] # 取返回的 id
return new_id
# 新增报表智能体
def insert_bot(
title: str,
description: str,
welcome_words: str,
ai_personality: str,
available_module: str,
available_report_tables: str,
available_kn_bases: str,
user_id: str,
):
with pg_pool.getConn() as conn:
with conn.cursor() as cursor:
available_roles = json.dumps(["user"])
cursor.execute(
"""
INSERT INTO ai_chat_profiles
(available_module,available_roles, title, description, welcome_words, ai_personality, available_report_tables, available_kn_bases, created_by, created_at)
VALUES (%s, %s, %s, %s, %s, %s, %s, %s, %s, now())
RETURNING id
""",
(
available_module,
available_roles,
title,
description,
welcome_words,
ai_personality,
available_report_tables,
available_kn_bases,
user_id,
),
)
report_id = cursor.fetchone()[0]
return report_id
# 更新报表智能体
def update_bot(
id: str,
title: str,
description: str,
welcome_words: str,
ai_personality: str,
available_module: str,
available_report_tables: str,
available_kn_bases: str,
user_id: str,
):
with pg_pool.getConn() as conn:
with conn.cursor() as cursor:
cursor.execute(
"""
UPDATE ai_chat_profiles
SET title = %s,
description = %s,
ai_personality = %s,
welcome_words = %s,
available_report_tables = %s,
available_kn_bases = %s,
available_module = %s,
updated_at = NOW(),
updated_by = %s
WHERE id = %s
""",
(
title,
description,
ai_personality,
welcome_words,
available_report_tables,
available_kn_bases,
available_module,
user_id,
id,
),
)
# ————————————————————————————————————————————————————知识库———————————————————————————————
def get_available_knowledge_bases(available_module: str):
with pg_pool.getConn() as conn:
with conn.cursor() as cursor:
cursor.execute(
"""
SELECT id, name, description, is_active
FROM ai_knowledge
WHERE available_module::jsonb @> %s::jsonb
""",
(f'["{available_module}"]',),
)
return [
{
"id": row[0],
"name": row[1],
"description": row[2],
"is_active": row[3],
}
for row in cursor.fetchall()
]
def add_knowledge_base(name, description, user_id):
with pg_pool.getConn() as conn:
with conn.cursor() as cursor:
cursor.execute(
"""
INSERT INTO ai_knowledge (name, description, created_by, created_at)
VALUES (%s, %s, %s, now())
RETURNING id
""",
(name, description, user_id),
)
new_id = cursor.fetchone()[0] # 取返回的 id
return new_id
# ————————————————————————————————————————————————————仪评指标联识别———————————————————————————————
from config.minIO import get_temp_url
import utils.MyUtils as MyUtils
def get_ticket_image_list(user_id):
with pg_pool.getConn() as conn:
with conn.cursor() as cursor:
cursor.execute(
"""
SELECT created_at, file_name, resolution, size, name,
moisture_content, cocoon_weight, defective_pupa_count,
fresh_shell_weight, sample_count, barcode, oss,
net_weight_total, evaluator, reviewer,id ,dead_pupa_count
FROM ticket_images
WHERE created_by = %s
""",
(user_id,),
)
rows = cursor.fetchall()
result = []
for row in rows:
result.append(
{
"created_at": MyUtils.format_datetime(row[0]),
"file_name": row[1],
"resolution": row[2],
"size": round(row[3] / 1024, 2),
"name": row[4],
"moisture_content": row[5],
"cocoon_weight": row[6],
"defective_pupa_count": row[7],
"fresh_shell_weight": row[8],
"sample_count": row[9],
"barcode": row[10],
"oss_url": get_temp_url("image-ticket", row[11]),
"net_weight_total": row[12],
"evaluator": row[13],
"reviewer": row[14],
"id": row[15],
"dead_pupa_count": row[16],
}
)
return result
def insert_ticket_image(
created_by,
file_name,
resolution,
size,
name,
moisture_content,
cocoon_weight,
defective_pupa_count,
dead_pupa_count,
fresh_shell_weight,
sample_count,
barcode,
oss,
net_weight_total,
evaluator,
reviewer,
):
with pg_pool.getConn() as conn:
with conn.cursor() as cursor:
cursor.execute(
"""
INSERT INTO ticket_images (
created_by, file_name, resolution, size, name,
moisture_content, cocoon_weight, defective_pupa_count, dead_pupa_count,
fresh_shell_weight, sample_count, barcode, oss,
net_weight_total, evaluator, reviewer, created_at
)
VALUES (%s, %s, %s, %s, %s, %s,
%s, %s, %s, %s, %s,
%s, %s, %s, %s, %s, NOW())
RETURNING id
""",
(
created_by,
file_name,
resolution,
size,
name,
moisture_content,
cocoon_weight,
defective_pupa_count,
dead_pupa_count,
fresh_shell_weight,
sample_count,
barcode,
oss,
net_weight_total,
evaluator,
reviewer,
),
)
new_id = cursor.fetchone()[0]
conn.commit()
return new_id
import json
# ————————————————————————————————————————————————————证件照片识别———————————————————————————————
def insert_license_image(
created_by, file_name, resolution, size, name, oss, type, content
):
with pg_pool.getConn() as conn:
with conn.cursor() as cursor:
cursor.execute(
"""
INSERT INTO license_images (
created_by, created_at, file_name, resolution, size, name, oss,
type, content
)
VALUES (%s, NOW(), %s, %s, %s, %s, %s, %s, %s )
RETURNING id
""",
(created_by, file_name, resolution, size, name, oss, type, content),
)
new_id = cursor.fetchone()[0]
conn.commit()
return new_id
def get_license_image_list(user_id, page=1, page_size=10):
"""
获取用户已分析图片列表,带分页
"""
offset = (page - 1) * page_size
with pg_pool.getConn() as conn:
with conn.cursor() as cursor:
# 1️⃣ 查询总条数
cursor.execute(
"""
SELECT COUNT(*)
FROM license_images
WHERE created_by = %s
""",
(user_id,),
)
total = cursor.fetchone()[0]
# 2️⃣ 查询当前页数据
cursor.execute(
"""
SELECT created_at, file_name, resolution, size, name, oss, id, type, content
FROM license_images
WHERE created_by = %s
ORDER BY created_at DESC
LIMIT %s OFFSET %s
""",
(user_id, page_size, offset),
)
rows = cursor.fetchall()
result = []
for row in rows:
result.append(
{
"created_at": MyUtils.format_datetime(row[0]),
"file_name": row[1],
"resolution": row[2],
"size": round(row[3] / 1024, 2),
"name": row[4],
"oss_url": get_temp_url("image-license", row[5]),
"id": row[6],
"type": row[7],
"content": row[8],
}
)
return total, result