ICode9

精准搜索请尝试: 精确搜索
首页 > 数据库> 文章详细

sqlalchemy介绍和快速使用,创建操作数据表,scoped_session线程安全,基本增删查改,一对多,多对多,flask-sqlalchemy使用和flask-migrate使用

2022-08-12 17:00:53  阅读:207  来源: 互联网

标签:sqlalchemy Users flask self 数据表 session query id name


1 sqlalchemy介绍和快速使用

# django 的orm框架,对象关系映射,只能在djagno中用
# sqlalchemy:独立的orm框架,轻松的集成到任意项目中去,SQLAlchemy是一个基于Python实现的ORM框架。该框架建立在 DB API之上,使用关系对象映射进行数据库操作,简言之便是:将类和对象转换成SQL,然后使用数据API执行SQL并获取执行结果

# djagno 的orm,sqlalchemy,peewee。。。   目前异步的orm框架,没有特别好的

# 安装:pip3 install sqlalchemy

# 组成部分(了解)
    Engine,框架的引擎
    Connection Pooling ,数据库连接池
    Dialect,选择连接数据库的DB API种类
    Schema/Types,架构和类型
    SQL Exprression Language,SQL表达式语言
    
    
# SQLAlchemy本身无法操作数据库,其必须以来pymsql等第三方插件
pymysql
    mysql+pymysql://<username>:<password>@<host>/<dbname>[?<options>]   
cx_Oracle
    oracle+cx_oracle://user:pass@host:port/dbname[?key=value&key=value...]
    
更多:http://docs.sqlalchemy.org/en/latest/dialects/index.html

1.1 原生操作的快速使用

# 原生操作,写原生sql,用的比较少

import time
import threading
import sqlalchemy
from sqlalchemy import create_engine
from sqlalchemy.engine.base import Engine

engine = create_engine(
    "mysql+pymysql://root:123@127.0.0.1:3306/luffy_api",
    max_overflow=0,  # 超过连接池大小外最多创建的连接
    pool_size=5,  # 连接池大小
    pool_timeout=30,  # 池中没有线程最多等待的时间,否则报错
    pool_recycle=-1  # 多久之后对线程池中的线程进行一次连接的回收(重置)
)
def task(arg):
    # 从连接池中拿一个链接
    conn = engine.raw_connection()
    cursor = conn.cursor()
    cursor.execute(
        "select * from luffy_banner"
    )
    result = cursor.fetchall()
    print(result)
    cursor.close()
    conn.close()

for i in range(20):
    t = threading.Thread(target=task, args=(i,))
    t.start()

2 创建操作数据表

import datetime
from sqlalchemy import create_engine
from sqlalchemy.ext.declarative import declarative_base
from sqlalchemy import Column, Integer, String, Text, ForeignKey, DateTime, UniqueConstraint, Index

Base = declarative_base()

class Users(Base):
    id = Column(Integer, primary_key=True)  # id 主键
    name = Column(String(32), index=True, nullable=False)  # name列,索引,不可为空
    email = Column(String(32), unique=True)
    #datetime.datetime.now不能加括号,加了括号,以后永远是当前时间
    ctime = Column(DateTime, default=datetime.datetime.now)
    extra = Column(Text, nullable=True)


    __tablename__ = 'users'  # 数据库表名称
    __table_args__ = (
        UniqueConstraint('id', 'name', name='uix_id_name'), #联合唯一
        Index('ix_id_name', 'name', 'email'), #索引
    )


# orm不能创建数据库,手动创建
#sqlalchemy只能创建表和删除表,不能新增,删除字段
# 创建表,同步到数据库
def init_db():
    """
    根据类创建数据库表
    """
    engine = create_engine(
        "mysql+pymysql://root:123@127.0.0.1:3306/aaa",
        max_overflow=0,  # 超过连接池大小外最多创建的连接
        pool_size=5,  # 连接池大小
        pool_timeout=30,  # 池中没有线程最多等待的时间,否则报错
        pool_recycle=-1  # 多久之后对线程池中的线程进行一次连接的回收(重置)
    )
    # 创建被Base管理的所有表
    Base.metadata.create_all(engine)


def drop_db():
    """
    根据类删除数据库表
    """
    engine = create_engine(
        "mysql+pymysql://root:123@127.0.0.1:3306/aaa?charset=utf8",
        max_overflow=0,  # 超过连接池大小外最多创建的连接
        pool_size=5,  # 连接池大小
        pool_timeout=30,  # 池中没有线程最多等待的时间,否则报错
        pool_recycle=-1  # 多久之后对线程池中的线程进行一次连接的回收(重置)
    )

    Base.metadata.drop_all(engine)

if __name__ == '__main__':
    # init_db()
    drop_db()

    
# sqlalchemy 只能创建表,删除表,不能增加删除字段  (在flask中,可以使用第三方支持)

3 scoped_session线程安全

# 1 新增数据
from sqlalchemy.orm import sessionmaker
from sqlalchemy import create_engine
from models import Users
from sqlalchemy.orm import scoped_session
from threading import Thread

# 第一步:创建engine
engine = create_engine(
    "mysql+pymysql://root:123@127.0.0.1:3306/aaa?charset=utf8",
    max_overflow=0,  # 超过连接池大小外最多创建的连接
    pool_size=5,  # 连接池大小
    pool_timeout=30,  # 池中没有线程最多等待的时间,否则报错
    pool_recycle=-1  # 多久之后对线程池中的线程进行一次连接的回收(重置)
)

# 第二步:生成session,每次执行数据库操作时,都需要创建一个Session
Session = sessionmaker(bind=engine)
# 原来的时候
# session = Session()
# 变成了它,它就是线程安全,session对象在flask框架中就可以是全局的,任意线程使用的是就是这个全局session,但是不会出现数据错乱问题
# 本质内部使用了local对象,保证了,每个线程实际上使用的是当前线程自己的session
session = scoped_session(Session)



# scoped_session类的对象,正常来讲是没有 add, close,commit...方法和属性的,但是实际上有,是通过create_proxy_methods装饰器,设置进去的(通过反射setattr写进去的)


def task(i):
    # 第三步:插入数据
    user = Users(name='lqz%s'%i, email='33%s@qq.com'%i, extra='lqz is handsome')
    session.add(user)
    # 第四步:提交
    session.commit()
    # 第五步:关闭链接
    session.close()


for i in range(10):
    t = Thread(target=task,args=[i,])
    t.start()

4 基本增删查改

from sqlalchemy.orm import sessionmaker
from sqlalchemy import create_engine
from models import Users
from sqlalchemy.sql import text

engine = create_engine(
    "mysql+pymysql://root:123@127.0.0.1:3306/aaa?charset=utf8",
    max_overflow=0,  # 超过连接池大小外最多创建的连接
    pool_size=5,  # 连接池大小
    pool_timeout=30,  # 池中没有线程最多等待的时间,否则报错
    pool_recycle=-1  # 多久之后对线程池中的线程进行一次连接的回收(重置)
)
Session = sessionmaker(bind=engine)
session = Session()

# 1 增加数据一条
# session.add(对象)

# 2 同时增加多条
# session.add_all([Users(name='lqz123', email='asdf@qq.com', extra='asfasdf'),Users(name='pyy', email='aseedf@qq.com', extra='asdfeedd')])


# 3  删除  ---》查出来删
# res = session.query(Users).filter(Users.id > 17).delete()
# res = session.query(Users).filter(Users.name == 'lqz').delete()
# print(res)

# 4 修改---->查出来改
# res=session.query(Users).filter(Users.id > 10).update({"name" : "lqz"})
# 类似于django的F查询
# session.query(Users).filter(Users.id > 0).update({Users.name: Users.name + "099"}, synchronize_session=False)
# session.query(Users).filter(Users.id > 0).update({"age": Users.age + 1}, synchronize_session="evaluate")

# 5 查询 ----》查询特别多
# 5.1 查所有
# res = session.query(Users).all()

# 5.2 指定查询的字段select name as xx,age from users;
# res = session.query(Users.name.label('xx'), Users.email)


# 5.3 通过filter 过滤
# filter写条件  >  <   ==
# res=session.query(Users).filter(Users.id > 0).all()
# res=session.query(Users).filter(Users.name == 'lqz2099').all()

# 5.4 filter_by 过滤,filter_by表达式
# res = session.query(Users).filter_by(name='lqz',email='332@qq.com').all()
# res = session.query(Users).filter_by(name='lqz').first()

# 5.5 自定制where部分查询sql
# res = session.query(Users).filter(text("id>:value or name=:name")).params(value=16, name='lqz099').all()
# res = session.query(Users).filter(Users.id<13 , Users.name=='lqz099').all()

# 5.6 纯自定义sql---》django中支持这个
# res = session.query(Users).from_statement(text("SELECT * FROM users where name=:name")).params(name='lqz').all()


# 5.7 filter和filter_by的其他使用
#  条件
# res = session.query(Users).filter_by(name='lqz').all()
# 表达式,and条件连接
# ret = session.query(Users).filter(Users.id > 1, Users.name == 'lqz').all()
# between
# res = session.query(Users).filter(Users.id.between(7, 9), Users.name == 'lqz').all()
# res = session.query(Users).filter(Users.id.between(7, 9)).all()
# res = session.query(Users).filter(Users.id.between(7, 9))  # 原生sql
# res = session.query(Users).filter(Users.id.between(7, 9), Users.name == 'lqz') # 原生sql

# in
# res = session.query(Users).filter(Users.id.in_([7,8,9])).all()
# ~非,除。。外
# res = session.query(Users).filter(~Users.id.in_([1, 3, 4])).all()

# 二次筛选
# res = session.query(Users).filter(Users.id.in_(session.query(Users.id).filter_by(name='lqz099'))).all()


# 5.8 与  或  非
# or_包裹的都是or条件,and_包裹的都是and条件
from sqlalchemy import and_, or_

# res = session.query(Users).filter(and_(Users.id > 3, Users.name == 'lqz099')).all()
# res = session.query(Users).filter(or_(Users.id < 2, Users.name == 'lqz099')).all()
# res = session.query(Users).filter(
#     or_(
#         Users.id < 2,
#         and_(Users.name == 'lqz099', Users.id > 3),
#         Users.extra != ""
#     ))

# 5.9 通配符,以e开头,不以e开头
# res = session.query(Users).filter(Users.name.like('lqz%')).all()
# res = session.query(Users).filter(~Users.name.like('%lqz%')).all()


# 5.10  限制,用于分页,区间
# res = session.query(Users)[1:2]
# 每页显示5条,第3页的数据
# res = session.query(Users)[3*5:3*5+5]


# 5.11 排序,根据name降序排列(从大到小)
# res = session.query(Users).order_by(Users.name.desc()).all()
# 第一个条件重复后,再按第二个条件升序排
# ret = session.query(Users).order_by(Users.name.desc(), Users.id.asc()).all()


# 5.12 分组
# 分组
from sqlalchemy.sql import func

# select * from user group by name;
# res = session.query(Users).group_by(Users.name).all()
# 分组之后取最大id,id之和,最小id
# select max(id),sum(id),min(id) from user group by name;
# res = session.query(
#     func.max(Users.id),
#     func.sum(Users.id),
#     func.min(Users.id)).group_by(Users.name).all()
# haviing筛选
# select max(id),sum(id),min(id) from user where email like 33% group by name  having min(id) > 2;
res = session.query(
    func.max(Users.id),
    func.sum(Users.id),
    func.min(Users.id)).filter(Users.email.like('33%')).group_by(Users.name).having(func.min(Users.id) > 2).all()







### 5.13 链表操作
# select * from users,favor where user.id= favor.nid;  # 先笛卡尔积再过滤
# ret = session.query(Users, Favor).filter(Users.id == Favor.nid).all()
#join表,默认是inner join
# select * from person inner join favor on person.favor_id=favor.id;
# ret = session.query(Person).join(Favor).all()


#isouter=True 外连,表示Person left join Favor,没有右连接,反过来即可
# select * from person left join favor on person.favor_id=favor.id;
# ret = session.query(Person).join(Favor, isouter=True).all()
# 右链接
# ret = session.query(Favor).join(Person, isouter=True).all()

# 自己指定on条件(连表条件),第二个参数,支持on多个条件,用and_,同上
# select * from person left join favor on person.id=favor.id;
# ret = session.query(Person).join(Favor,Person.id==Favor.id, isouter=True).all()


## 5.14  union和union all的区别?
# 组合(了解)UNION 操作符用于合并两个或多个 SELECT 语句的结果集
#union和union all的区别?
# q1 = session.query(Users.name).filter(Users.id > 2)
# q2 = session.query(Favor.caption).filter(Favor.nid < 2)
# ret = q1.union(q2).all()
#
# q1 = session.query(Users.name).filter(Users.id > 2)
# q2 = session.query(Favor.caption).filter(Favor.nid < 2)
# ret = q1.union_all(q2).all()


print(res)

# 第四步:提交
session.commit()
# 第五步:关闭链接
session.close()

5 一对多

###一对多关系
class Hobby(Base):
    __tablename__ = 'hobby'
    id = Column(Integer, primary_key=True)
    caption = Column(String(50), default='篮球')


class Person(Base):
    __tablename__ = 'person'
    nid = Column(Integer, primary_key=True)
    name = Column(String(32), index=True, nullable=True)
    # hobby指的是tablename而不是类名,uselist=False
    hobby_id = Column(Integer, ForeignKey("hobby.id"))


    # 跟数据库无关,不会新增字段,只用于快速链表操作
    # 基于对象的跨表查询:
    # 类名,backref用于反向查询
    hobby = relationship('Hobby', backref='pers')

    def __repr__(self):
        return self.name
from sqlalchemy.orm import sessionmaker
from sqlalchemy import create_engine
from models import Person, Hobby
from sqlalchemy.sql import text

engine = create_engine(
    "mysql+pymysql://root:123@127.0.0.1:3306/aaa?charset=utf8",
    max_overflow=0,  # 超过连接池大小外最多创建的连接
    pool_size=5,  # 连接池大小
    pool_timeout=30,  # 池中没有线程最多等待的时间,否则报错
    pool_recycle=-1  # 多久之后对线程池中的线程进行一次连接的回收(重置)
)
Session = sessionmaker(bind=engine)
session = Session()

# 1  一对多增加
# 方式一:
# session.add(Hobby(caption='足球'))
# session.add(Person(name='lqz',hobby_id=1))
# session.add(Person(name='pyy',hobby_id=2))  # 有约束,增加失败

# 方式二:
# session.add(Person(name='刘亦菲', hobby=Hobby(caption='橄榄球')))


# 2 一对多查询
# 2.1 基于对象的跨表查询
### 正向查询
# res = session.query(Person).filter(Person.name == '刘亦菲').first()
# print(res)
# print(res.hobby.caption)

## 反向查询
# res=session.query(Hobby).filter_by(caption='足球').first()
# print(res)
# # 取出喜欢足球的所有人---》反向查
# print(res.pers)


# 2.2 基于连表的跨表查询
# select * from person,hobby where person.hobby_id=hobby.id and person.name=彭于晏;
# res=session.query(Person,Hobby).filter(Person.hobby_id==Hobby.id,Person.name=='彭于晏').all()

#select * from person inner join hobby on person.hobby_id=hobby.id where person.name=彭于晏;
res = session.query(Person).join(Hobby).filter(Person.name=='彭于晏').all()

print(res)

session.commit()
session.close()

6 多对多

#多对多
class Boy2Girl(Base):
    __tablename__ = 'boy2girl'
    id = Column(Integer, primary_key=True, autoincrement=True)
    girl_id = Column(Integer, ForeignKey('girl.id'))
    boy_id = Column(Integer, ForeignKey('boy.id'))



class Girl(Base):
    __tablename__ = 'girl'
    id = Column(Integer, primary_key=True)
    name = Column(String(64), unique=True, nullable=False)

class Boy(Base):
    __tablename__ = 'boy'
    id = Column(Integer, primary_key=True, autoincrement=True)
    name = Column(String(64), unique=True, nullable=False)

    # 与生成表结构无关,仅用于查询方便,放在哪个单表中都可以
    #方便快速查询,写了这个字段,相当于django 的manytomany,快速使用基于对象的跨表查询
    girls = relationship('Girl', secondary='boy2girl', backref='boys')
from sqlalchemy.orm import sessionmaker
from sqlalchemy import create_engine
from models import Boy,Girl,Boy2Girl
from sqlalchemy.sql import text

engine = create_engine(
    "mysql+pymysql://root:123@127.0.0.1:3306/aaa?charset=utf8",
    max_overflow=0,  # 超过连接池大小外最多创建的连接
    pool_size=5,  # 连接池大小
    pool_timeout=30,  # 池中没有线程最多等待的时间,否则报错
    pool_recycle=-1  # 多久之后对线程池中的线程进行一次连接的回收(重置)
)
Session = sessionmaker(bind=engine)
session = Session()

# 1  多对多增加
# 方式一:所有表都有,一个个增加
# session.add(Boy(name='dxl'))
# session.add(Girl(name='zqh'))
# session.add_all([Boy(name='dxl'),Girl(name='zqh')])
# session.add_all([Boy2Girl(boy_id=1,girl_id=1),Boy2Girl(boy_id=1,girl_id=2)])


# 方式二:
# session.add(Boy(name='张涛',girls=[Girl(name='芙蓉姐姐'),Girl(name='凤姐')]))


# 2 一对多查询
# 2.1 基于对象的跨表查询
### 正向查询

# res=session.query(Boy).filter_by(name='张涛').first()
# print(res.girls)

## 反向查询
# res=session.query(Girl).filter_by(name='凤姐').first()
# print(res.boys)



# 2.2 基于连表的跨表查询


# print(res)

session.commit()
session.close()

7 flask-sqlalchemy使用和flask-migrate使用

# flask-sqlalchemy:帮助我们快速把sqlalchemy集成到flask中
# 表字段增加删除不支持:flask-migrate,以后增加了字段,删除字段只需要两条迁移命令就完成了

### flask—sqlalchemy
# 第一步:导入SQLAlchemy,实例化得到对象db
# 第二步:注册
	db.init_app(app)
# 第三步:创建表模型,继承db.Model
# 第四步:使用session
	db.session   处理了线程安全
    
    
    
    
### 把表同步到数据库中---》flask-migrate
	## 基于flask_script,定制几个命令  python manage.py db init/ migrate/upgrade
    
    # 第一个命令:
    python38 manage.py db init  # 初始化,项目使用的时候,只敲一次,生成migrations文件夹
    # 第二个命令:只要改了表结构,增加表,删除表,增加字段,删除字段
    python38 manage.py db migrate # 不会在数据库中生成记录,只是记录变化等同于makemigrations
    
    # 第三个命令
	python38 manage.py db upgrade  # 等同于migrate

8 请求上下文分析

    def wsgi_app(self, environ, start_response):
        # 读完了:ctx:是RequestContext的对象,内部有当次请求的request对象,session对象,app对象,flash对象
        ctx = self.request_context(environ)
        error = None
        try:
            try:
                ctx.push()
                response = self.full_dispatch_request()
            except Exception as e:
                error = e
                response = self.handle_exception(e)
            except:  # noqa: B001
                error = sys.exc_info()[1]
                raise
            return response(environ, start_response)
        finally:
            if self.should_ignore_error(error):
                error = None
            ctx.auto_pop(error)
请求上下文执行流程(ctx):
		-0 flask项目一启动,有6个全局变量
			-_request_ctx_stack:LocalStack对象
			-_app_ctx_stack :LocalStack对象
			-request : LocalProxy对象
			-session : LocalProxy对象
		-1 请求来了 app.__call__()---->内部执行:self.wsgi_app(environ, start_response)
		-2 wsgi_app()
			-2.1 执行:ctx = self.request_context(environ):返回一个RequestContext对象,并且封装了request(当次请求的request对象),session,flash,当前app对象
			-2.2 执行: ctx.push():RequestContext对象的push方法
				-2.2.1 push方法中中间位置有:_request_ctx_stack.push(self),self是ctx对象
				-2.2.2 去_request_ctx_stack对象的类中找push方法(LocalStack中找push方法)
				-2.2.3 push方法源码:
				    def push(self, obj):
						#通过反射找self._local,在init实例化的时候生成的:self._local = Local()
						#Local(),flask封装的支持线程和协程的local对象
						# 一开始取不到stack,返回None
						rv = getattr(self._local, "stack", None)
						if rv is None:
							#走到这,self._local.stack=[],rv=self._local.stack
							self._local.stack = rv = []
						# 把ctx放到了列表中
						#self._local={'线程id1':{'stack':[ctx,]},'线程id2':{'stack':[ctx,]},'线程id3':{'stack':[ctx,]}}
						rv.append(obj)
						return rv
		-3 如果在视图函数中使用request对象,比如:print(request)
			-3.1 会调用request对象的__str__方法,request类是:LocalProxy
			-3.2 LocalProxy中的__str__方法:lambda x: str(x._get_current_object())
				-3.2.1 内部执行self._get_current_object()
				-3.2.2 _get_current_object()方法的源码如下:
				    def _get_current_object(self):
						if not hasattr(self.__local, "__release_local__"):
							#self.__local()  在init的时候,实例化的,在init中:object.__setattr__(self, "_LocalProxy__local", local)
							# 用了隐藏属性
							#self.__local 实例化该类的时候传入的local(偏函数的内存地址:partial(_lookup_req_object, "request"))
							#加括号返回,就会执行偏函数,也就是执行_lookup_req_object,不需要传参数了
							#这个地方的返回值就是request对象(当此请求的request,没有乱)
							return self.__local()
						try:
							return getattr(self.__local, self.__name__)
						except AttributeError:
							raise RuntimeError("no object bound to %s" % self.__name__)
				-3.2.3 _lookup_req_object函数源码如下:
					def _lookup_req_object(name):
						#name是'request'字符串
						#top方法是把第二步中放入的ctx取出来,因为都在一个线程内,当前取到的就是当次请求的ctx对象
						top = _request_ctx_stack.top
						if top is None:
							raise RuntimeError(_request_ctx_err_msg)
						#通过反射,去ctx中把request对象返回
						return getattr(top, name)
				-3.2.4 所以:print(request) 实质上是在打印当此请求的request对象的__str__
		-4 如果在视图函数中使用request对象,比如:print(request.method):实质上是取到当次请求的reuquest对象的method属性
		
		-5 最终,请求结束执行: ctx.auto_pop(error),把ctx移除掉
		
	其他的东西:
		-session:
			-请求来了opensession
				-ctx.push()---->也就是RequestContext类的push方法的最后的地方:
					if self.session is None:
						#self是ctx,ctx中有个app就是flask对象,   self.app.session_interface也就是它:SecureCookieSessionInterface()
						session_interface = self.app.session_interface
						self.session = session_interface.open_session(self.app, self.request)
						if self.session is None:
							#经过上面还是None的话,生成了个空session
							self.session = session_interface.make_null_session(self.app)
			-请求走了savesession
				-response = self.full_dispatch_request() 方法内部:执行了before_first_request,before_request,视图函数,after_request,savesession
				-self.full_dispatch_request()---->执行:self.finalize_request(rv)-----》self.process_response(response)----》最后:self.session_interface.save_session(self, ctx.session, response)
		-请求扩展相关
			before_first_request,before_request,after_request依次执行
		-flask有一个请求上下文,一个应用上下文
			-ctx:
				-是:RequestContext对象:封装了request和session
				-调用了:_request_ctx_stack.push(self)就是把:ctx放到了那个位置
			-app_ctx:
				-是:AppContext(self) 对象:封装了当前的app和g
				-调用 _app_ctx_stack.push(self) 就是把:app_ctx放到了那个位置
	-g是个什么鬼?
		专门用来存储用户信息的g对象,g的全称的为global 
		g对象在一次请求中的所有的代码的地方,都是可以使用的 
		
		
	-代理模式
		-request和session就是代理对象,用的就是代理模式

标签:sqlalchemy,Users,flask,self,数据表,session,query,id,name
来源: https://www.cnblogs.com/chunyouqudongwuyuan/p/16580633.html

本站声明: 1. iCode9 技术分享网(下文简称本站)提供的所有内容,仅供技术学习、探讨和分享;
2. 关于本站的所有留言、评论、转载及引用,纯属内容发起人的个人观点,与本站观点和立场无关;
3. 关于本站的所有言论和文字,纯属内容发起人的个人观点,与本站观点和立场无关;
4. 本站文章均是网友提供,不完全保证技术分享内容的完整性、准确性、时效性、风险性和版权归属;如您发现该文章侵犯了您的权益,可联系我们第一时间进行删除;
5. 本站为非盈利性的个人网站,所有内容不会用来进行牟利,也不会利用任何形式的广告来间接获益,纯粹是为了广大技术爱好者提供技术内容和技术思想的分享性交流网站。

专注分享技术,共同学习,共同进步。侵权联系[81616952@qq.com]

Copyright (C)ICode9.com, All Rights Reserved.

ICode9版权所有