import logging import time from contextlib import contextmanager from urllib.parse import quote_plus from langchain_community.utilities import SQLDatabase from sqlalchemy import create_engine from sqlalchemy.exc import OperationalError as SQLOperationalError logger = logging.getLogger("MSSQLPool") logger.setLevel(logging.INFO) class MSSQLPool: """ SQL Server 连接池封装 """ def __init__( self, user: str, password: str, host: str, database: str, driver: str = "ODBC Driver 18 for SQL Server", encrypt: str = "no", trust_server_certificate: str = "yes", min_size: int = 1, max_size: int = 10, pool_timeout: int = 10, ): self.user = user self.password = quote_plus(password) # 处理特殊字符 self.host = host self.database = database self.driver = driver self.encrypt = encrypt self.trust_server_certificate = trust_server_certificate self.engine = create_engine( f"mssql+pyodbc://{self.user}:{self.password}@{self.host}/{self.database}" f"?driver={self.driver}&Encrypt={self.encrypt}&TrustServerCertificate={self.trust_server_certificate}", pool_size=min_size, max_overflow=max_size - min_size, pool_timeout=pool_timeout, ) @contextmanager def getConn(self, retries: int = 2, delay: float = 1.0): """ 获取连接,带重试 """ attempt = 0 while attempt <= retries: try: with self.engine.connect() as conn: yield conn return except SQLOperationalError as e: logger.warning(f"数据库连接异常: {e}. 尝试重试 ({attempt+1}/{retries})") attempt += 1 time.sleep(delay) except Exception as e: logger.error(f"SQL执行异常: {e}") raise raise SQLOperationalError("无法获取数据库连接,多次重试失败") mssql_pool = MSSQLPool( user="f8_read", password="www.bbitcn.com", host="f8.api.dev.bbitcn.cn", database="f8_db_dev", ) # SQLAlchemy URI uri = "mssql+pyodbc://f8_read:www.bbitcn.com@f8.api.dev.bbitcn.cn/f8_db_dev?driver=ODBC+Driver+18+for+SQL+Server&TrustServerCertificate=yes&Encrypt=no" # 建立数据库对象 ssDBLC = SQLDatabase.from_uri( uri, include_tables=[ "NONGHU_INFO", ], schema="dbo", # 显式指定 schema )