Files
AILab/bbit_ai/app/db/postgres/report.py
T
2025-12-08 18:11:48 +08:00

234 lines
7.4 KiB
Python
Raw Blame History

This file contains ambiguous Unicode characters
This file contains Unicode characters that might be confused with other characters. If you think that this is intentional, you can safely ignore this warning. Use the Escape button to reveal them.
from config.pgDb import pg_pool
def get_reports(user_id: str):
with pg_pool.getConn() as conn:
with conn.cursor() as cur:
cur.execute(
"SELECT id, title FROM ai_reports WHERE created_by = %s AND is_masked = TRUE ORDER BY created_at DESC",
(user_id,),
)
reports = cur.fetchall()
return [{"id": row[0], "title": row[1]} for row in reports]
def save_report(id: str, user_id: str, title: str, sql: str):
with pg_pool.getConn() as conn:
with conn.cursor() as cur:
cur.execute(
"INSERT INTO ai_reports (id, title, sql, created_at, created_by , is_masked) VALUES (%s, %s, %s, NOW(), %s, FALSE) RETURNING id",
(id, title, sql, user_id),
)
report_id = cur.fetchone()[0]
conn.commit()
return report_id
def maked_report(report_id: str, title: str):
with pg_pool.getConn() as conn:
with conn.cursor() as cur:
cur.execute(
"UPDATE ai_reports SET title = %s, is_masked = TRUE WHERE id = %s",
(title, report_id),
)
conn.commit()
def getSQL(reportId: str):
with pg_pool.getConn() as conn:
with conn.cursor() as cur:
cur.execute("SELECT sql FROM ai_reports WHERE id = %s", (reportId,))
row = cur.fetchone()
if row:
return row[0]
else:
return ""
def get_available_tables_str(aiId: str):
with pg_pool.getConn() as conn:
with conn.cursor() as cur:
# 1. 先取 AI 可用的数据库表
cur.execute(
"SELECT available_report_tables FROM ai_chat_profiles WHERE id = %s",
(aiId,),
)
role_row = cur.fetchone()
if not role_row:
return "无数据库表可用"
available_tables = role_row[0] # 假设是列表
if not available_tables:
return "无数据库表可用"
# 2. 构造 IN 查询占位符
placeholders = ",".join(["%s"] * len(available_tables))
sql_query = f"""
SELECT id, name, description
FROM ai_reports_tables
WHERE id IN ({placeholders}) AND is_active = TRUE
"""
cur.execute(sql_query, available_tables)
tableIds = cur.fetchall()
# 3. 查询这些表的字段
result = ""
for table in tableIds:
cur.execute(
"SELECT name, type, description FROM ai_reports_fields WHERE table_id = %s AND is_active = TRUE",
(table[0],),
)
columns = cur.fetchall()
result += f"{table[1]}{table[2]}\n"
result += "字段名,数据类型,描述\n"
for column in columns:
result += f"{column[0]},{column[1]}, {column[2]}\n"
result += "\n"
return result
# -------------------报表数据源------------------
# 获取表
def get_available_tables():
with pg_pool.getConn() as conn:
with conn.cursor() as cursor:
cursor.execute(
"SELECT id, name, description,is_active FROM ai_reports_tables",
)
return [
{
"id": row[0],
"name": row[1],
"description": row[2],
"is_active": row[3],
}
for row in cursor.fetchall()
]
# 新增表
def add_table(name, description, user_id):
with pg_pool.getConn() as conn:
with conn.cursor() as cursor:
cursor.execute(
"""
INSERT INTO ai_reports_tables (name, description, create_by)
VALUES (%s, %s, %s)
RETURNING id
""",
(name, description, user_id),
)
new_id = cursor.fetchone()[0] # 取返回的 id
return new_id
# 获取字段
def get_fields_by_table_id(table_id):
with pg_pool.getConn() as conn:
with conn.cursor() as cursor:
cursor.execute(
"SELECT id, name, type, description, is_active FROM ai_reports_fields WHERE table_id = %s",
(table_id,),
)
return [
{
"id": row[0],
"name": row[1],
"type": row[2],
"description": row[3],
"is_active": row[4],
}
for row in cursor.fetchall()
]
# 新增字段
def add_field(name, type, description, is_active, table_id, user_id):
with pg_pool.getConn() as conn:
with conn.cursor() as cursor:
cursor.execute(
"INSERT INTO ai_reports_fields (name,type,description, is_active, create_by, table_id) VALUES (%s, %s, %s, %s, %s, %s) RETURNING id",
(name, type, description, is_active, user_id, table_id),
)
new_id = cursor.fetchone()[0] # 取返回的 id
return new_id
# 新增报表智能体
def insert_bot(
title: str,
description: str,
welcome_words: str,
ai_personality: str,
available_module: str,
available_report_tables: str,
available_kn_bases: str,
user_id: str,
):
with pg_pool.getConn() as conn:
with conn.cursor() as cursor:
available_roles = json.dumps(["user"])
cursor.execute(
"""
INSERT INTO ai_chat_profiles
(available_module,available_roles, title, description, welcome_words, ai_personality, available_report_tables, available_kn_bases, created_by, created_at)
VALUES (%s, %s, %s, %s, %s, %s, %s, %s, %s, now())
RETURNING id
""",
(
available_module,
available_roles,
title,
description,
welcome_words,
ai_personality,
available_report_tables,
available_kn_bases,
user_id,
),
)
report_id = cursor.fetchone()[0]
return report_id
# 更新报表智能体
def update_bot(
id: str,
title: str,
description: str,
welcome_words: str,
ai_personality: str,
available_module: str,
available_report_tables: str,
available_kn_bases: str,
user_id: str,
):
with pg_pool.getConn() as conn:
with conn.cursor() as cursor:
cursor.execute(
"""
UPDATE ai_chat_profiles
SET title = %s,
description = %s,
ai_personality = %s,
welcome_words = %s,
available_report_tables = %s,
available_kn_bases = %s,
available_module = %s,
updated_at = NOW(),
updated_by = %s
WHERE id = %s
""",
(
title,
description,
ai_personality,
welcome_words,
available_report_tables,
available_kn_bases,
available_module,
user_id,
id,
),
)