Files
AILab/bbit_ai/app/config/httpClient.py
T
2026-02-04 13:58:18 +08:00

58 lines
1.7 KiB
Python

import httpx
from fastapi import HTTPException
class HttpClient:
def __init__(self, timeout: int = 10):
self.timeout = timeout
self.client = httpx.AsyncClient(timeout=timeout)
async def request(
self,
method: str,
url: str,
params: dict = None,
data: dict = None,
json: dict = None,
headers: dict = None,
):
"""
通用请求方法
method: GET / POST / PUT / DELETE 等
url: 请求地址
params: URL 参数
data: form 数据
json: JSON 数据
headers: 请求头
"""
try:
response = await self.client.request(
method=method.upper(),
url=url,
params=params,
data=data,
json=json,
headers=headers,
)
response.raise_for_status()
return response.json()
except httpx.RequestError as e:
raise HTTPException(status_code=500, detail=f"请求失败: {e}")
except httpx.HTTPStatusError as e:
raise HTTPException(
status_code=e.response.status_code, detail=f"外部接口返回错误: {e}"
)
except Exception as e:
raise HTTPException(status_code=500, detail=f"解析失败: {e}")
async def get(self, url: str, params: dict = None, headers: dict = None):
return await self.request("GET", url, params=params, headers=headers)
async def post(
self, url: str, data: dict = None, json: dict = None, headers: dict = None
):
return await self.request("POST", url, data=data, json=json, headers=headers)
async def close(self):
await self.client.aclose()