69 lines
1.9 KiB
Python
69 lines
1.9 KiB
Python
import logging
|
|
import time
|
|
from contextlib import contextmanager
|
|
from typing import Optional
|
|
import psycopg
|
|
from psycopg_pool import ConnectionPool
|
|
|
|
logger = logging.getLogger("PGPool")
|
|
logger.setLevel(logging.INFO)
|
|
|
|
class PGPool:
|
|
"""
|
|
PostgreSQL 连接池封装
|
|
"""
|
|
|
|
def __init__(
|
|
self,
|
|
uri: str,
|
|
min_size: int = 1,
|
|
max_size: int = 20,
|
|
max_idle: int = 30,
|
|
max_lifetime: int = 300,
|
|
timeout: int = 10,
|
|
check: bool = False,
|
|
):
|
|
"""
|
|
:param uri: PostgreSQL 连接 URI
|
|
"""
|
|
self.uri = uri
|
|
self.pool = ConnectionPool(
|
|
self.uri,
|
|
min_size=min_size,
|
|
max_size=max_size,
|
|
max_idle=max_idle,
|
|
max_lifetime=max_lifetime,
|
|
timeout=timeout,
|
|
check=check,
|
|
)
|
|
|
|
@contextmanager
|
|
def getConn(self, retries: int = 2, delay: float = 1.0):
|
|
"""
|
|
获取数据库连接,带重试机制,自动健康检查。
|
|
使用方式:
|
|
with pg_pool.get_conn() as conn:
|
|
with conn.cursor() as cur:
|
|
cur.execute(...)
|
|
"""
|
|
attempt = 0
|
|
while attempt <= retries:
|
|
try:
|
|
with self.pool.connection() as conn:
|
|
conn.autocommit = True
|
|
yield conn
|
|
return
|
|
except psycopg.OperationalError as e:
|
|
logger.warning(f"数据库连接异常: {e}. 尝试重试 ({attempt+1}/{retries})")
|
|
self.pool.check() # 丢掉坏连接,重新建
|
|
attempt += 1
|
|
time.sleep(delay)
|
|
except Exception as e:
|
|
logger.error(f"SQL执行异常: {e}")
|
|
raise
|
|
raise psycopg.OperationalError("无法获取数据库连接,多次重试失败")
|
|
pg_pool = PGPool(
|
|
uri="postgresql://postgres:123456@10.10.10.9/ktor2",
|
|
min_size=1,
|
|
max_size=20,
|
|
) |