Files
AILab/bbit_ai/app/config/ssDb.py
T
2025-11-05 18:10:00 +08:00

87 lines
2.5 KiB
Python

import logging
import time
from contextlib import contextmanager
from urllib.parse import quote_plus
from langchain_community.utilities import SQLDatabase
from sqlalchemy import create_engine
from sqlalchemy.exc import OperationalError as SQLOperationalError
logger = logging.getLogger("MSSQLPool")
logger.setLevel(logging.INFO)
class MSSQLPool:
"""
SQL Server 连接池封装
"""
def __init__(
self,
user: str,
password: str,
host: str,
database: str,
driver: str = "ODBC Driver 18 for SQL Server",
encrypt: str = "no",
trust_server_certificate: str = "yes",
min_size: int = 1,
max_size: int = 10,
pool_timeout: int = 10,
):
self.user = user
self.password = quote_plus(password) # 处理特殊字符
self.host = host
self.database = database
self.driver = driver
self.encrypt = encrypt
self.trust_server_certificate = trust_server_certificate
self.engine = create_engine(
f"mssql+pyodbc://{self.user}:{self.password}@{self.host}/{self.database}"
f"?driver={self.driver}&Encrypt={self.encrypt}&TrustServerCertificate={self.trust_server_certificate}",
pool_size=min_size,
max_overflow=max_size - min_size,
pool_timeout=pool_timeout,
)
@contextmanager
def getConn(self, retries: int = 2, delay: float = 1.0):
"""
获取连接,带重试
"""
attempt = 0
while attempt <= retries:
try:
with self.engine.connect() as conn:
yield conn
return
except SQLOperationalError as e:
logger.warning(f"数据库连接异常: {e}. 尝试重试 ({attempt+1}/{retries})")
attempt += 1
time.sleep(delay)
except Exception as e:
logger.error(f"SQL执行异常: {e}")
raise
raise SQLOperationalError("无法获取数据库连接,多次重试失败")
mssql_pool = MSSQLPool(
user="f8_read",
password="www.bbitcn.com",
host="f8.api.dev.bbitcn.cn",
database="f8_db_dev",
)
# SQLAlchemy URI
uri = "mssql+pyodbc://f8_read:www.bbitcn.com@f8.api.dev.bbitcn.cn/f8_db_dev?driver=ODBC+Driver+18+for+SQL+Server&TrustServerCertificate=yes&Encrypt=no"
# 建立数据库对象
ssDBLC = SQLDatabase.from_uri(
uri,
include_tables=[
"NONGHU_INFO",
],
schema="dbo", # 显式指定 schema
)