ICode9

精准搜索请尝试: 精确搜索
首页 > 其他分享> 文章详细

FastAPI用户安全性解决方案

2021-11-18 21:32:02  阅读:251  来源: 互联网

标签:username FastAPI self db 解决方案 str fake password 安全性


1.需求分析

  1. 用户登录验证
  2. 用户登录保持

2.实现思路

  1. 用户发送用户名和密码到服务器,服务器端验证后,生成并返回令牌(token)。
  2. 用户再次访问时,每次请求携带token,访问私有信息。

3.支持模块

后端支持:

  1. 用于校验密码的哈希算法。
  2. 令牌计算和解译工具。
  3. 数据库和数据库连接工具,为简化操作,这里使用文件存储代替。
  4. 路由处理。
  5. 跨越请求。

前端支持:

  1. ajax请求模块。
  2. 本地端口服务。

4.文件组织

采用前后端分离方法。前端和后端分别放在不同的文件夹。
后端文件如下:

main.py //主文件
my_fake_db_connect.py//模拟数据库连接工具
fake_data.json//用json文件模拟数据库
my_tools.py//密码校验和令牌创建、解译工具,包含类PasswordCheck,TokenCreator

实现代码

main.py

main.py文件主要作用是处理路由请求,配置跨域许可,调用各类对象和方法,其中关键代码在路由方法中。

#文件名:main.py
from fastapi import FastAPI,Depends,HTTPException, status
from fastapi.middleware.cors import CORSMiddleware#跨域请求
from my_fake_db_connect import fake_db_connect#虚拟数据库连接
from my_tools import PasswordCheck,TokenCreator
from fastapi.security import OAuth2PasswordBearer, OAuth2PasswordRequestForm

oauth2_scheme = OAuth2PasswordBearer(tokenUrl="token")#初始化安全性方案对象

#初始化工具类
tokencreator=TokenCreator()
passwordcheck=PasswordCheck('fake_data.json')
db=fake_db_connect('fake_data.json')

# 放行跨域请求的域名

origins = [
    "http://localhost.tiangolo.com",
    "https://localhost.tiangolo.com",
    "http://localhost",
    "http://localhost:8080",
    "http://localhost:8848",
    "http://127.0.0.1:8848",
    "http://127.0.0.1:5500"
]

app=FastAPI()

# 添加跨域方案
app.add_middleware(
    CORSMiddleware,
    allow_origins=origins,
    allow_credentials=True,
    allow_methods=["*"],
    allow_headers=["*"],
)



#游客的访问
@app.get("/hello")
async def hello(name:str):
    return "hello,"+name+"!"

#登录,返回令牌
@app.post("/token")
async def login(form_data: OAuth2PasswordRequestForm = Depends()):
    '''
    处理登录请求
    '''
    #user = authenticate_user(
    #    fake_users_db, form_data.username, form_data.password)
    username=form_data.username
    password=form_data.password

    if not passwordcheck.check(username, password):
        raise HTTPException(
            status_code=status.HTTP_401_UNAUTHORIZED,
            detail="Incorrect username or password",
            headers={"WWW-Authenticate": "Bearer"},
        )

    access_token=tokencreator.create(username)
    print('返回用户令牌',access_token)
    # 返回用户令牌
    return {"access_token": access_token, "token_type": "bearer"}


@app.get("/user/me")
async def get_current_user(token: str = Depends(oauth2_scheme)):
    '''
    获取当前用户信息。
    '''
    # 默认权限错误信息
    credentials_exception = HTTPException(
        status_code=status.HTTP_401_UNAUTHORIZED,
        detail="Could not validate credentials",
        headers={"WWW-Authenticate": "Bearer"},
    )
    # 尝试解码token获取username
    username=tokencreator.decode(token)
    
    if username is None: raise credentials_exception

	# 尝试从数据库取用户信息
    try: userinfo=db.get(username)
    except: raise credentials_exception
       
    if userinfo is None: raise credentials_exception
    
    return userinfo
    

my_tools.py

这里封装了本案例中最核心的方法。即密码的校验、令牌的生成和解译。

class PasswordCheck():
    '''
    密码校验工具。
    初始化方法:password_check=PasswordCheck("数据库名")。
    密码检查方法:.check("用户名","密码"),返回布尔类型。
    '''
    from my_fake_db_connect import fake_db_connect#数据库连接模块
    from passlib.context import CryptContext#哈希算法模块
    pwd_context = CryptContext(schemes=["bcrypt"], deprecated="auto")#定义一个哈希算法

    def __init__(self,db:str):#
        self.db=self.fake_db_connect(db)
        pass


    def check(self,username:str,password:str)->bool:
        try:
            userinfo=self.db.get(username)#尝试取数据
            hashed_password=userinfo.get('hashed_password')
            if self.pwd_context.verify(password, hashed_password):
                return True
            else: return False
        except:
            return False
      

    def __call__(self,username:str,password:str)->bool:
        self.check(username, password)


class TokenCreator():
    '''
    令牌生成器。
    '''
    #算法库
    from jose import JWTError, jwt

    from datetime import datetime, timedelta
    from typing import Optional

    #令牌加密方法和有效期
    SECRET_KEY = "09d25e094faa6ca2556c818166b7a9563b93f7099f6f0f4caa6cf63b88e8d3e7"
    ALGORITHM = "HS256"
    ACCESS_TOKEN_EXPIRE_MINUTES = 30


    def __init__(self):
        pass
    
    def __call__(self,username:str):
        self.create(username)

    def create(self,username:str,expires_delta:Optional[timedelta] = None)->str:
        if expires_delta:
            # 失效时间等于当前时间加有效期
            expire = self.datetime.utcnow()+expires_delta
        else:
            expire = self.datetime.utcnow()+self.timedelta(minutes=15)

        data={"sub":username,'exp':expire}#写入令牌的数据
        encode_jwt = self.jwt.encode(data, self.SECRET_KEY, algorithm=self.ALGORITHM)#生成令牌
        return encode_jwt

    def decode(self,token:str)->str:
        '''
        解码Token,返回用户名
        '''
        try:
            payload = self.jwt.decode(token, self.SECRET_KEY, algorithms=[self.ALGORITHM])
            username: str = payload.get("sub")
            if username is None:
                return None

        except self.JWTError:
            return None
            print('解码令牌出错')
        return username

my_fake_db_connect.py

这里使用JSON文件模拟数据库存储,使用python自带功能模拟数据库的增删改查,这样做数据丢失风险极大,这里仅作为测试时的替代方法,实际工作中务必使用真正的数据库。

class fake_db_connect():
    '''
    模拟数据库连接工具
    '''
    import json
    def __init__(self,filename:str):
        self.filename=filename
        with open(filename, 'r',encoding='utf-8') as f:
            self.fake_user_table = self.json.load(f)
    
    def get(self,primaryKey:str)->dict:
        return self.fake_user_table.get(primaryKey)

    def insert(self,info:dict):
        primaryKey=info['username']
        self.fake_user_table[primaryKey]=info

    def commit(self):
        with open(self.filename, 'w',encoding='utf-8') as f:
            self.json.dump(self.fake_user_table, f, ensure_ascii=False,indent=2)
    
    def delete(self,primaryKey:str):
        del self.fake_user_table[primaryKey]

fake_data.json

{
  "johndoe": {
    "username": "johndoe",
    "full_name": "John Doe",
    "email": "johndoe@example.com",
    "hashed_password": "$2b$12$EixZaYVK1fsbw1ZfbX3OXePaWxn96p36WQoeG6Lruj3vjPGga31lW",
    "disabled": false
  },
  "alice": {
    "username": "alice",
    "full_name": "Alice Wonderson",
    "email": "alice@example.com",
    "hashed_password": "fakehashedsecret2",
    "disabled": true
  },
  "lihua": {
    "username": "lihua",
    "full_name": "Li Hua",
    "email": "lihua@example.com",
    "hashed_password": "fakehashedmima",
    "disabled": true
  }
}

标签:username,FastAPI,self,db,解决方案,str,fake,password,安全性
来源: https://blog.csdn.net/ashtyukjhf/article/details/121409637

本站声明: 1. iCode9 技术分享网(下文简称本站)提供的所有内容,仅供技术学习、探讨和分享;
2. 关于本站的所有留言、评论、转载及引用,纯属内容发起人的个人观点,与本站观点和立场无关;
3. 关于本站的所有言论和文字,纯属内容发起人的个人观点,与本站观点和立场无关;
4. 本站文章均是网友提供,不完全保证技术分享内容的完整性、准确性、时效性、风险性和版权归属;如您发现该文章侵犯了您的权益,可联系我们第一时间进行删除;
5. 本站为非盈利性的个人网站,所有内容不会用来进行牟利,也不会利用任何形式的广告来间接获益,纯粹是为了广大技术爱好者提供技术内容和技术思想的分享性交流网站。

专注分享技术,共同学习,共同进步。侵权联系[81616952@qq.com]

Copyright (C)ICode9.com, All Rights Reserved.

ICode9版权所有