ICode9

精准搜索请尝试: 精确搜索
首页 > 其他分享> 文章详细

httpx_send_file

2021-09-29 13:00:07  阅读:142  来源: 互联网

标签:files name send pop file kwargs httpx clint


# -*- coding: utf-8 -*-
# !/usr/bin/env python
# Software: PyCharm
# __author__ == "YU HAIPENG"
# fileName: httpx_send_file.py
# Month: 九月
# time: 2021/9/16 15:37
""" Write an introduction to the module here """
import httpx
import os

__all__ = ["send_file", "async_send_file"]


def send_file(url, files, clint: httpx.Client = None, *args, **kwargs):
    """

    @param url:
    @param files:
            FileTypes = Union[
                # file (or text or path)
                FileContent,
                # (filename, file (or text))
                Tuple[str, FileContent],
                # (filename, file (or text), content_type)
                Tuple[str, FileContent, str],
            ]
        Files = Union[dict[str, FileTypes], [[str, FileTypes]], [dict[str, FileTypes]]]
        Files = {"remote_name": {"file_name": Union[path, bytes]}}
        Files = path
        Files = bytes
    @param args:
    param remote_name = None 远程接收名字
    @param kwargs:
    @param clint:
    @return:
    """
    files = __md5_check(__parse_file(files, kwargs))
    close_flag = False
    if not clint:
        clint = httpx.Client(
            verify=kwargs.pop("verify", True),
            cert=kwargs.pop("cert", None),
            http2=kwargs.pop("http2", False),
            event_hooks=kwargs.pop("event_hooks", None),
            proxies=kwargs.pop("proxies", None),
            mounts=kwargs.pop("mounts", None),
            base_url=kwargs.pop("base_url", ''),
            transport=kwargs.pop("transport", None),
            trust_env=kwargs.pop("trust_env", True),
        )
        close_flag = True
    try:
        timeout = kwargs.pop('timeout', 20)
        method = kwargs.pop('method', 'post')
        assert method.lower() in ['post', "put", "patch"]
        clint_function = getattr(clint, method.lower())
        response = clint_function(url, files=files, timeout=timeout, *args, **kwargs)
    finally:
        if close_flag:
            clint.close()
    return response


async def async_send_file(url, files, clint: httpx.AsyncClient = None, *args, **kwargs):
    """

       @param url:
       @param files:
               FileTypes = Union[
                   # file (or text or path)
                   FileContent,
                   # (filename, file (or text))
                   Tuple[str, FileContent],
                   # (filename, file (or text), content_type)
                   Tuple[str, FileContent, str],
               ]
           Files = Union[dict[str, FileTypes], [[str, FileTypes]], [dict[str, FileTypes]]]
           Files = {"remote_name": {"file_name": Union[path, bytes]}}
           Files = path
           Files = bytes
       @param args:
       param remote_name = None 远程接收名字
       @param kwargs:
       @param clint:
       @return:
       """

    files = __md5_check(__parse_file(files, kwargs))
    timeout = kwargs.pop('timeout', 20)
    method = kwargs.pop('method', 'post')
    assert method.lower() in ['post', "put", "patch"]
    if clint:
        clint_function = getattr(clint, method.lower())
        response = await clint_function(url, files=files, timeout=timeout, *args, **kwargs)
    else:
        async with httpx.AsyncClient(
                verify=kwargs.pop("verify", True),
                cert=kwargs.pop("cert", None),
                http2=kwargs.pop("http2", False),
                event_hooks=kwargs.pop("event_hooks", None),
                proxies=kwargs.pop("proxies", None),
                mounts=kwargs.pop("mounts", None),
                base_url=kwargs.pop("base_url", ''),
                transport=kwargs.pop("transport", None),
                trust_env=kwargs.pop("trust_env", True),
        ) as clint:
            clint_function = getattr(clint, method.lower())
            response = await clint_function(url, files=files, timeout=timeout, *args, **kwargs)
    return response


def __parse_file(files, kwargs):
    """
    文件参数处理
    @param files:
    @param kwargs:
    @return:
    """
    remote_name = kwargs.pop("remote_name", "file")

    def get_file(path, return_name=False):
        if os.path.isfile(path):
            filename = os.path.split(path)[-1]
            with open(path, 'rb') as __f:
                path = __f.read()
            if return_name:
                return [filename, path]
        return path

    if isinstance(files, dict):
        for key, value in files.items():
            if not value:
                files = None
                break
            elif isinstance(value, bytes):
                continue
            elif isinstance(value, str):
                files[key] = tuple(get_file(value, return_name=True))
            elif isinstance(value, dict):
                for file_name, content in value.items():
                    if isinstance(content, bytes):
                        files[key] = (file_name, content)
                    elif isinstance(content, str):
                        if not file_name:
                            files[key] = tuple(get_file(content, return_name=True))
                        else:
                            files[key] = (file_name, get_file(content))
                    break
            else:
                if 2 <= len(value) <= 3:
                    files[key] = list(value)
                    if isinstance(files[key][1], str):
                        if not files[key][0]:
                            files[key][0], files[key][1] = get_file(
                                files[key][1], return_name=True)
                        else:
                            files[key][1] = get_file(files[key][1])
                        files[key] = tuple(files[key])
                else:
                    raise ValueError("value length must (2 or 3)")
    elif isinstance(files, (tuple, list)):
        files = list(files)

        for _, value in enumerate(files):
            if isinstance(value, dict):
                # {'doc': ('doc', 'C:\\Users\\yuhaipeng\\Desktop\\网神信息.txt', 'application/plan')}
                for k, v in value.items():
                    value = (k, v)
                    break
            value = list(value)
            files[_] = value
            if isinstance(value[1], str):
                value[1] = tuple(get_file(value[1], return_name=True))
                files[_] = tuple(value)
            elif isinstance(value[1], (tuple, list)):
                value[1] = list(value[1])
                if 2 <= len(value[1]) <= 3:
                    if isinstance(value[1][1], str):
                        if not value[1][0]:
                            value[1][0], value[1][1] = get_file(value[1][1], return_name=True)
                        else:
                            value[1][1] = get_file(value[1][1])
                        value[1] = tuple(value[1])
                    elif isinstance(value[1][1], bytes):
                        value[1] = tuple(value[1])
                    else:
                        if not value[1][0]:
                            value[1][0] = "name"
                        value[1] = tuple(value[1])
                    files[_] = tuple(value)
                else:
                    raise ValueError("value length must (2 or 3)")
            elif isinstance(value[1], bytes):
                files[_] = tuple(value)
    elif isinstance(files, str):
        files = ((remote_name, tuple(get_file(files, return_name=True))),)
    elif isinstance(files, bytes):
        files = ((remote_name, files),)
    else:
        raise ValueError('error')
    return files


def __md5_check(files):
    """
    文件重复去重
    @return:
    """
    md5_check_li = list()
    import hashlib
    if isinstance(files, (tuple, list)):
        file_len = len(files)
        if file_len > 1:
            files = list(files)
            row = 0
            while row < file_len:
                file = files[row]
                if not isinstance(file[1][1], bytes):
                    digest = hashlib.md5(file[1][:10000]).hexdigest()
                else:
                    digest = hashlib.md5(file[1][1][:10000]).hexdigest()
                if digest not in md5_check_li:
                    md5_check_li.append(digest)
                else:
                    files.remove(file)
                    file_len -= 1
                    continue
                row += 1

    elif isinstance(files, dict):
        file_list = list(files.keys())
        file_len = len(file_list)
        if file_len > 1:
            row = 0
            while row < file_len:
                digest = hashlib.md5(files[file_list[row]][1][:10000]).hexdigest()
                if digest not in md5_check_li:
                    md5_check_li.append(digest)
                else:
                    files.pop(file_list[row], None)
                row += 1

    return files


if __name__ == '__main__':
    r = r"C:\Users\yuhaipeng\Desktop\网神信息.txt"
    # with httpx.Client() as session:
    #     print(send_file(
    #         "http://192.168.1.247/api/doc/html",
    #         files=r,
    #         # clint=session,
    #         remote_name="doc",
    #         data={"a": 1},
    #         params={"b": 1},
    #         method='post'
    #     ).text)

    import asyncio


    #
    # loop = asyncio.get_event_loop()
    #
    # x, b = loop.run_until_complete(
    #     asyncio.wait(
    #         [
    #             async_send_file(
    #                 "http://192.168.1.247/api/doc/html",
    #                 files=r,
    #                 # clint=session,
    #                 remote_name="doc",
    #             )
    #         ]
    #     )
    # )
    # for i in x:
    #     print(i.result().json())

    async def send_main(**kwargs):
        async with httpx.AsyncClient() as cli:
            # response = await async_send_file(
            #     "http://192.168.1.247/api/doc/html",
            #     files=[("doc", r)],
            #     clint=cli,
            #     remote_name="doc",
            # )
            # response = await async_send_file(
            #     "http://192.168.1.247/api/doc/html",
            #     files=r,
            #     clint=cli,
            #     remote_name="doc",
            # )

            response = await async_send_file(
                "http://192.168.1.247/api/doc/html",
                files=r,
                clint=cli,
                remote_name="doc",
            )

            # response = await async_send_file(
            #     "http://192.168.1.247/api/doc/html",
            #     files={"doc": ("file_name", r, "abcd")},
            #     clint=cli,
            #     remote_name="doc",
            # )
        print(response.json())


    loop = asyncio.get_event_loop()

    loop.run_until_complete(
        asyncio.wait(
            [
                send_main()
            ]
        )
    )

标签:files,name,send,pop,file,kwargs,httpx,clint
来源: https://www.cnblogs.com/yuhaipeng/p/15352361.html

本站声明: 1. iCode9 技术分享网(下文简称本站)提供的所有内容,仅供技术学习、探讨和分享;
2. 关于本站的所有留言、评论、转载及引用,纯属内容发起人的个人观点,与本站观点和立场无关;
3. 关于本站的所有言论和文字,纯属内容发起人的个人观点,与本站观点和立场无关;
4. 本站文章均是网友提供,不完全保证技术分享内容的完整性、准确性、时效性、风险性和版权归属;如您发现该文章侵犯了您的权益,可联系我们第一时间进行删除;
5. 本站为非盈利性的个人网站,所有内容不会用来进行牟利,也不会利用任何形式的广告来间接获益,纯粹是为了广大技术爱好者提供技术内容和技术思想的分享性交流网站。

专注分享技术,共同学习,共同进步。侵权联系[81616952@qq.com]

Copyright (C)ICode9.com, All Rights Reserved.

ICode9版权所有