from datetime import timedelta from minio import Minio from minio.commonconfig import CopySource # MinIO 客户端初始化 minio_client = Minio( "ai.ronsunny.cn:9000", access_key="minioadmin", secret_key="minioadmin", region="Chengdu", secure=True, ) def push_file(bucket_name, object_name, file_bytes, contents, content_type): minio_client.put_object( bucket_name, object_name, file_bytes, length=len(contents), content_type=content_type, ) def get_upload_token(bucket_name, object_name, xpires=timedelta(minutes=15)): return minio_client.presigned_put_object( bucket_name=bucket_name, object_name=object_name, expires=xpires ) def get_temp_url(bucket_name, object_name, seconds: float = 3600): # 如果 object_name 为 None 或空字符串,则返回默认图片 if not object_name or not bucket_name: bucket_name = "system" object_name = "favicon.ico" # 使用 presigned_get_object 获取临时 URL return minio_client.presigned_get_object( bucket_name, object_name, expires=timedelta(seconds=seconds) ) def get_temp_url_dict(bucket_name, object_dict, object_name): # 如果 object_name 为 None 或空字符串,则返回默认图片 if not object_name: bucket_name = "system" object_dict = "default" object_name = "favicon.ico" # 使用 presigned_get_object 获取临时 URL return minio_client.presigned_get_object( bucket_name, object_dict + "/" + object_name, expires=timedelta(seconds=3600) ) # 移动文件(实际上是 copy + delete) def move_file(bucket_name, source_object_name, target_object_name): """ bucket_name: bucket名称 source_object_name: 原对象路径,例如 folder1/test.jpg target_object_name: 目标对象路径,例如 folder2/test.jpg """ # 复制到新位置 minio_client.copy_object( bucket_name, target_object_name, CopySource(bucket_name, source_object_name) ) # 删除原文件 minio_client.remove_object(bucket_name, source_object_name) # 删除文件 def delete_file(bucket_name, object_name): """ bucket_name: bucket名称 object_name: 文件路径,例如 folder/test.jpg """ minio_client.remove_object(bucket_name, object_name)