import logging import time from contextlib import contextmanager from typing import Optional import psycopg from psycopg_pool import ConnectionPool logger = logging.getLogger("PGPool") logger.setLevel(logging.INFO) class PGPool: """ PostgreSQL 连接池封装 """ def __init__( self, uri: str, min_size: int = 1, max_size: int = 20, max_idle: int = 30, max_lifetime: int = 300, timeout: int = 10, check: bool = False, ): """ :param uri: PostgreSQL 连接 URI """ self.uri = uri self.pool = ConnectionPool( self.uri, min_size=min_size, max_size=max_size, max_idle=max_idle, max_lifetime=max_lifetime, timeout=timeout, check=check, ) @contextmanager def getConn(self, retries: int = 2, delay: float = 1.0): """ 获取数据库连接,带重试机制,自动健康检查。 使用方式: with pg_pool.get_conn() as conn: with conn.cursor() as cur: cur.execute(...) """ attempt = 0 while attempt <= retries: try: with self.pool.connection() as conn: conn.autocommit = True yield conn return except psycopg.OperationalError as e: logger.warning(f"数据库连接异常: {e}. 尝试重试 ({attempt+1}/{retries})") self.pool.check() # 丢掉坏连接,重新建 attempt += 1 time.sleep(delay) except Exception as e: logger.error(f"SQL执行异常: {e}") raise raise psycopg.OperationalError("无法获取数据库连接,多次重试失败") pg_pool = PGPool( uri="postgresql://postgres:123456@10.10.10.9/ktor2", min_size=1, max_size=20, )