Files
AILab/bbit_ai/app/config/minIO.py
T
2026-03-26 17:48:20 +08:00

80 lines
2.3 KiB
Python
Raw Blame History

This file contains ambiguous Unicode characters
This file contains Unicode characters that might be confused with other characters. If you think that this is intentional, you can safely ignore this warning. Use the Escape button to reveal them.
from datetime import timedelta
from minio import Minio
from minio.commonconfig import CopySource
# MinIO 客户端初始化
minio_client = Minio(
"ai.ronsunny.cn:9000",
access_key="minioadmin",
secret_key="minioadmin",
region="Chengdu",
secure=True,
)
def push_file(bucket_name, object_name, file_bytes, contents, content_type):
minio_client.put_object(
bucket_name,
object_name,
file_bytes,
length=len(contents),
content_type=content_type,
)
def get_upload_token(bucket_name, object_name, xpires=timedelta(minutes=15)):
return minio_client.presigned_put_object(
bucket_name=bucket_name, object_name=object_name, expires=xpires
)
def get_temp_url(bucket_name, object_name, seconds: float = 3600):
# 如果 object_name 为 None 或空字符串,则返回默认图片
if not object_name or not bucket_name:
bucket_name = "system"
object_name = "favicon.ico"
# 使用 presigned_get_object 获取临时 URL
return minio_client.presigned_get_object(
bucket_name, object_name, expires=timedelta(seconds=seconds)
)
def get_temp_url_dict(bucket_name, object_dict, object_name):
# 如果 object_name 为 None 或空字符串,则返回默认图片
if not object_name:
bucket_name = "system"
object_dict = "default"
object_name = "favicon.ico"
# 使用 presigned_get_object 获取临时 URL
return minio_client.presigned_get_object(
bucket_name, object_dict + "/" + object_name, expires=timedelta(seconds=3600)
)
# 移动文件(实际上是 copy + delete
def move_file(bucket_name, source_object_name, target_object_name):
"""
bucket_name: bucket名称
source_object_name: 原对象路径,例如 folder1/test.jpg
target_object_name: 目标对象路径,例如 folder2/test.jpg
"""
# 复制到新位置
minio_client.copy_object(
bucket_name, target_object_name, CopySource(bucket_name, source_object_name)
)
# 删除原文件
minio_client.remove_object(bucket_name, source_object_name)
# 删除文件
def delete_file(bucket_name, object_name):
"""
bucket_name: bucket名称
object_name: 文件路径,例如 folder/test.jpg
"""
minio_client.remove_object(bucket_name, object_name)