ICode9

精准搜索请尝试: 精确搜索
首页 > 编程语言> 文章详细

python 阿里oss上传文件封装

2021-12-20 09:02:54  阅读:181  来源: 互联网

标签:封装 OSS python oss settings upload dict str self


import traceback
from datetime import datetime
from logging import getLogger

import oss2
from django.conf import settings
from rest_framework import status

logger = getLogger('sms')


class OSS:
    def __init__(self, connect_timeout: int = 30):
        """
        :param connect_timeout: 连接超时时间
        """
        self.base_url: str = settings.OSS_BASE_URL
        self.connect_timeout: int = connect_timeout
        self.space_prefix: str = settings.OSS_SPACE_PREFIX
        self.auth_kwargs: dict = dict(
            access_key_id=settings.OSS_ACCESS_KEY_ID,
            access_key_secret=settings.OSS_ACCESS_KEY_SECRET
        )
        self.auth = oss2.Auth(**self.auth_kwargs)
        self.bucket_kwargs: dict = dict(
            auth=self.auth,
            endpoint=settings.OSS_ENDPOINT,
            bucket_name=settings.OSS_BUCKET_NAME,
            connect_timeout=connect_timeout
        )
        self.bucket = oss2.Bucket(**self.bucket_kwargs)

    def upload(
            self,
            file: bytes,
            file_name: str,
            category: str,
            user_code: str
    ) -> str:
        """
        上传单个文件

        :param file: 文件
        :param file_name: 文件名称(用户输入的名称)
        :param category: 文件类型
        :param user_code: 用户唯一编码
        :return: 文件上传后的URL
        """
        upload_date: str = str(datetime.now())
        upload_path = f"{self.space_prefix}/{user_code}/{category}/{upload_date}/{file_name}"
        fixed_path: str = self.base_url + upload_path
        upload_kwargs: dict = dict(
            key=upload_path,
            data=file,
        )
        try:
            response = self.bucket.put_object(**upload_kwargs)
            if response.status == status.HTTP_200_OK:
                return fixed_path
            logger.error(traceback.format_exc())
            logger.info(f'OSS文件上传错误 状态:{response.status}')
            print(f'OSS文件上传错误 状态:{response.status}')
            return ''
        except Exception as e:
            logger.error(traceback.format_exc())
            logger.info(f'OSS文件上传错误:{e}')
            print(f'OSS文件上传错误:{e}')
            return ''


oss_client = OSS()

标签:封装,OSS,python,oss,settings,upload,dict,str,self
来源: https://www.cnblogs.com/tao-xiaoxin/p/15709416.html

本站声明: 1. iCode9 技术分享网(下文简称本站)提供的所有内容,仅供技术学习、探讨和分享;
2. 关于本站的所有留言、评论、转载及引用,纯属内容发起人的个人观点,与本站观点和立场无关;
3. 关于本站的所有言论和文字,纯属内容发起人的个人观点,与本站观点和立场无关;
4. 本站文章均是网友提供,不完全保证技术分享内容的完整性、准确性、时效性、风险性和版权归属;如您发现该文章侵犯了您的权益,可联系我们第一时间进行删除;
5. 本站为非盈利性的个人网站,所有内容不会用来进行牟利,也不会利用任何形式的广告来间接获益,纯粹是为了广大技术爱好者提供技术内容和技术思想的分享性交流网站。

专注分享技术,共同学习,共同进步。侵权联系[81616952@qq.com]

Copyright (C)ICode9.com, All Rights Reserved.

ICode9版权所有