67 lines
1.8 KiB
Python
67 lines
1.8 KiB
Python
from uuid import UUID
|
|
|
|
import jwt
|
|
from fastapi import Header, HTTPException
|
|
from jwt import PyJWTError
|
|
from starlette.websockets import WebSocketDisconnect
|
|
|
|
JWT_SECRET = "secret_jwt"
|
|
JWT_ALGORITHM = "HS256"
|
|
JWT_AUDIENCE = "snowflake-ink"
|
|
JWT_ISSUER = "https://snowflake.ink/"
|
|
|
|
|
|
def get_user_id_from_token(token: str = Header(..., alias="Authorization")) -> UUID:
|
|
"""
|
|
从 Authorization 头解析 token,并返回 user_id
|
|
假设前端传 Authorization: Bearer <token>
|
|
"""
|
|
if token.startswith("Bearer "):
|
|
token = token[7:]
|
|
else:
|
|
raise HTTPException(status_code=401, detail="Invalid token format")
|
|
|
|
try:
|
|
payload = jwt.decode(
|
|
token,
|
|
JWT_SECRET,
|
|
algorithms=[JWT_ALGORITHM],
|
|
audience=JWT_AUDIENCE,
|
|
issuer=JWT_ISSUER,
|
|
)
|
|
except PyJWTError:
|
|
raise HTTPException(status_code=401, detail="Token is missing or invalid")
|
|
|
|
if payload.get("token_type") != "access_token":
|
|
raise HTTPException(status_code=401, detail="Invalid token type")
|
|
|
|
user_id = payload.get("user_id")
|
|
if not user_id:
|
|
raise HTTPException(status_code=401, detail="User ID not found in token")
|
|
|
|
return UUID(user_id)
|
|
|
|
|
|
def get_user_id_from_token_from_ws(token: str) -> UUID:
|
|
if token.startswith("Bearer "):
|
|
token = token[7:]
|
|
try:
|
|
payload = jwt.decode(
|
|
token,
|
|
JWT_SECRET,
|
|
algorithms=[JWT_ALGORITHM],
|
|
audience=JWT_AUDIENCE,
|
|
issuer=JWT_ISSUER,
|
|
)
|
|
except PyJWTError:
|
|
raise WebSocketDisconnect() # token 无效就断开
|
|
|
|
if payload.get("token_type") != "access_token":
|
|
raise WebSocketDisconnect()
|
|
|
|
user_id = payload.get("user_id")
|
|
if not user_id:
|
|
raise WebSocketDisconnect()
|
|
|
|
return UUID(user_id)
|