80 lines
2.3 KiB
Python
80 lines
2.3 KiB
Python
from datetime import timedelta
|
||
|
||
from minio import Minio
|
||
from minio.commonconfig import CopySource
|
||
|
||
# MinIO 客户端初始化
|
||
minio_client = Minio(
|
||
"ai.ronsunny.cn:9000",
|
||
access_key="minioadmin",
|
||
secret_key="minioadmin",
|
||
region="Chengdu",
|
||
secure=True,
|
||
)
|
||
|
||
|
||
def push_file(bucket_name, object_name, file_bytes, contents, content_type):
|
||
minio_client.put_object(
|
||
bucket_name,
|
||
object_name,
|
||
file_bytes,
|
||
length=len(contents),
|
||
content_type=content_type,
|
||
)
|
||
|
||
|
||
def get_upload_token(bucket_name, object_name, xpires=timedelta(minutes=15)):
|
||
return minio_client.presigned_put_object(
|
||
bucket_name=bucket_name, object_name=object_name, expires=xpires
|
||
)
|
||
|
||
|
||
def get_temp_url(bucket_name, object_name, seconds: float = 3600):
|
||
# 如果 object_name 为 None 或空字符串,则返回默认图片
|
||
if not object_name or not bucket_name:
|
||
bucket_name = "system"
|
||
object_name = "favicon.ico"
|
||
|
||
# 使用 presigned_get_object 获取临时 URL
|
||
return minio_client.presigned_get_object(
|
||
bucket_name, object_name, expires=timedelta(seconds=seconds)
|
||
)
|
||
|
||
|
||
def get_temp_url_dict(bucket_name, object_dict, object_name):
|
||
# 如果 object_name 为 None 或空字符串,则返回默认图片
|
||
if not object_name:
|
||
bucket_name = "system"
|
||
object_dict = "default"
|
||
object_name = "favicon.ico"
|
||
|
||
# 使用 presigned_get_object 获取临时 URL
|
||
return minio_client.presigned_get_object(
|
||
bucket_name, object_dict + "/" + object_name, expires=timedelta(seconds=3600)
|
||
)
|
||
|
||
|
||
# 移动文件(实际上是 copy + delete)
|
||
def move_file(bucket_name, source_object_name, target_object_name):
|
||
"""
|
||
bucket_name: bucket名称
|
||
source_object_name: 原对象路径,例如 folder1/test.jpg
|
||
target_object_name: 目标对象路径,例如 folder2/test.jpg
|
||
"""
|
||
# 复制到新位置
|
||
minio_client.copy_object(
|
||
bucket_name, target_object_name, CopySource(bucket_name, source_object_name)
|
||
)
|
||
|
||
# 删除原文件
|
||
minio_client.remove_object(bucket_name, source_object_name)
|
||
|
||
|
||
# 删除文件
|
||
def delete_file(bucket_name, object_name):
|
||
"""
|
||
bucket_name: bucket名称
|
||
object_name: 文件路径,例如 folder/test.jpg
|
||
"""
|
||
minio_client.remove_object(bucket_name, object_name)
|