ICode9

精准搜索请尝试: 精确搜索
首页 > 其他分享> 文章详细

FastAPI 学习之路(五十八)对之前的代码进行优化

2021-10-28 05:31:19  阅读:207  来源: 互联网

标签:FastAPI self await ws user str 代码 def 五十八


我们之前登录认证的一些内容都直接写入到代码中,我们现在统一的给放到config文件中。

SECRET_KEY = "09d25e094faa6ca2556c818166b7a9563b93f7099f6f0f4caa6cf63b88e8d3e7"
ALGORITHM = "HS256"
ACCESS_TOKEN_EXPIRE_MINUTES = 30

我们需要把uer.py修改

from config import *
#对应的方法
def create_access_token(data: dict):
    to_encode = data.copy()
    encoded_jwt = jwt.encode(to_encode, SECRET_KEY, algorithm=ALGORITHM)
    return encoded_jwt


async def get_cure_user(request: Request, token: Optional[str] = Header(...)) -> UserBase:
    credentials_exception = HTTPException(
        status_code=status.HTTP_401_UNAUTHORIZED,
        detail="验证失败"
    )
    credentials_FOR_exception = HTTPException(
        status_code=status.HTTP_403_FORBIDDEN,
        detail="用户未登录或者登陆token已经失效"
    )
    try:
        payload = jwt.decode(token, SECRET_KEY, algorithms=[ALGORITHM])
        username: str = payload.get("sub")
        if username is None:
            raise credentials_exception
        useris = await request.app.state.redis.get(username)
        if not useris and useris!=token:
            raise credentials_FOR_exception
        user = UserBase(email=username)
        return user
    except JWTError:
        raise credentials_exception

 在存储redis的地方也需要修改

request.app.state.redis.set(user.email, token,
 expire=ACCESS_TOKEN_EXPIRE_MINUTES * 60)

我们把main.py的redis相关的也放在配置里

#config.py
redishost='127.0.0.1'
redisport='6379'
redisdb='0'

 main.py修改

async def get_redis_pool() -> Redis:
    redis = await create_redis_pool(f"redis://:@"+redishost+":"+redisport+"/"+redisport+"?encoding=utf-8")
    return redis

调整之前的websocket和file相关的代码。统一放在routers文件下,创建

websoocket.py。

from fastapi import APIRouter,WebSocket,Depends,WebSocketDisconnect
from typing import List, Dict
from routers.user import get_cure_user
socketRouter = APIRouter()
class ConnectionManager:
    def __init__(self):
        # 存放**的链接
        self.active_connections: List[Dict[str, WebSocket]] = []

    async def connect(self, user: str, ws: WebSocket):
        # 链接
        await ws.accept()
        self.active_connections.append({"user": user, "ws": ws})

    def disconnect(self, user: str, ws: WebSocket):
        # 关闭时 移除ws对象
        self.active_connections.remove({"user": user, "ws": ws})

    async def send_other_message_json(self, message: dict, user: str):
        # 发送个人消息
        for connection in self.active_connections:
            if connection["user"] == user:
                await connection['ws'].send_json(message)

    async def broadcast_json(self, data: dict):
        # 广播消息
        for connection in self.active_connections:
            await connection['ws'].send_json(data)

    @staticmethod
    async def send_personal_message(message: str, ws: WebSocket):
        # 发送个人消息
        await ws.send_text(message)

    async def send_other_message(self, message: str, user: str):
        # 发送个人消息
        for connection in self.active_connections:
            if connection["user"] == user:
                await connection['ws'].send_text(message)

    async def broadcast(self, data: str):
        # 广播消息
        for connection in self.active_connections:
            await connection['ws'].send_text(data)


manager = ConnectionManager()

@socketRouter.websocket("/ws/{user}/")
async def websocket_many_point(
        websocket: WebSocket,
        user: str,
        cookie_or_token: str = Depends(get_cure_user),
):
    await manager.connect(user, websocket)
    try:
        while True:
            data = await websocket.receive_json()
            senduser = data['username']
            if senduser:
                await manager.send_other_message_json(data, senduser)
            else:
                await  manager.broadcast_json(data)
    except WebSocketDisconnect as e:
        manager.disconnect(user, websocket)

 对于file相关的也放在files文件下

from fastapi import APIRouter, File, UploadFile
from typing import List
fileRouter = APIRouter()
@fileRouter.post("/uploadfile/")
async def upload_file(fileS: List[UploadFile] = File(...)):
    return {"filenames": [file.filename for file in fileS]}

然后我们在main.py优化

from routers.websoocket import socketRouter
from routers.file import  fileRouter
#我们在最后注册这个router
app.include_router(socketRouter, prefix="/socket", tags=['socket'])
app.include_router(fileRouter, prefix="/file", tags=['file'])

调整完整后,我们的接口更加清晰了。我们启动下,看我们先有的接口。

 

 

代码存储
https://gitee.com/liwanlei/fastapistuday

  

文章首发在公众号,欢迎关注。

标签:FastAPI,self,await,ws,user,str,代码,def,五十八
来源: https://www.cnblogs.com/leiziv5/p/15416945.html

本站声明: 1. iCode9 技术分享网(下文简称本站)提供的所有内容,仅供技术学习、探讨和分享;
2. 关于本站的所有留言、评论、转载及引用,纯属内容发起人的个人观点,与本站观点和立场无关;
3. 关于本站的所有言论和文字,纯属内容发起人的个人观点,与本站观点和立场无关;
4. 本站文章均是网友提供,不完全保证技术分享内容的完整性、准确性、时效性、风险性和版权归属;如您发现该文章侵犯了您的权益,可联系我们第一时间进行删除;
5. 本站为非盈利性的个人网站,所有内容不会用来进行牟利,也不会利用任何形式的广告来间接获益,纯粹是为了广大技术爱好者提供技术内容和技术思想的分享性交流网站。

专注分享技术,共同学习,共同进步。侵权联系[81616952@qq.com]

Copyright (C)ICode9.com, All Rights Reserved.

ICode9版权所有