ICode9

精准搜索请尝试: 精确搜索
首页 > 数据库> 文章详细

MySQL异步驱动aiomysql

2022-01-28 19:02:34  阅读:182  来源: 互联网

标签:异步 await args cursor sql time MySQL aiomysql id


 

  本文介绍异步MySQL异步驱动aiomysql的使用

  1,安装异步模块

  如果没有模块则先使用pip安装模块

?
1 2 pip3 install asyncio pip3 install aiomysql

  2,创建MySQL数据库连接池

  和同步方式不一样的是使用异步不能直接创建数据库连接conn,需要先创建一个数据库连接池对象__pool通过这个数据库连接池对象来创建数据库连接

  数据库配置信息和介绍pymysql同步使用的数据库是一样的

?
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 import asyncio,aiomysql,time # 数据库配置dict db_config = {     'host': 'localhost',     'user': 'www-data',     'password': 'www-data',     'db': 'awesome' }   # 创建数据库连接池协程函数 async def create_pool(**kw):     global __pool     __pool = await aiomysql.create_pool(         host=kw.get('host', 'localhost'),         port=kw.get('port', 3306),         user=kw['user'],         password=kw['password'],         db=kw['db']     )   loop=asyncio.get_event_loop() loop.run_until_complete(create_pool(**db_config)) # 在事件循环中运行了协程函数则生成了全局变量__pool是一个连接池对象 <aiomysql.pool.Pool object at 0x00000244AD1724C8> print(__pool) # <aiomysql.pool.Pool object at 0x00000244AD1724C8>

  3,创建执行sql语句的协程函数

  因为是异步模块,只能在事件循环中通过await关键字调用,使用需要创建执行sql语句的协程函数

  在协程函数内使用全局上一步创建的连接池对象来创建连接conn和浮标对象cur,通过浮标对象来执行sql语句,执行方法和pymysql模块的执行方法是一样的

?
1 2 3 4 5 6 cursor.execute(sql,args) sql # 需要执行的sql语句例如'select * from table_name' args # 替换sql语句的格式化字符串,即sql语句可以使用%s代表一个字符串,然后在args中使用对应的变量或参数替换,args为一个list或元组,即是一个有序的序列需要和sql中的%s一一对应 # 例如sql='select * from table_name where id=%s'  args=['12345'] # 相当于使用args中的参数替换sql中的%s # select * from table_name where id='12345'

  下面分别创建两个协程函数select execute一个用来执行搜索操作,一个用来执行insert,update,delete等修改操作

?
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 # 执行select函数 async def select(sql,args,size=None):     with await __pool as conn:         cur = await conn.cursor(aiomysql.DictCursor)         await cur.execute(sql.replace('?','?s'),args or ())         if size:             rs = await cur.fetchmany(size)         else:             rs = await cur.fetchall()         await cur.close()         return rs     # 执行insert update delete函数 async def execute(sql,args):     with await __pool as conn:         try:             cur = await conn.cursor()             await cur.execute(sql.replace('?','%s'),args)             affetced = cur.rowcount             await conn.commit()             await cur.close()         except BaseException as e:             raise         return affetced

  4,实践执行sql语句

  实践执行sql语句前我们首先在本机创建一个数据库和对应的表用于测试

  数据库对应的主机,用户名,密码,库名,表名如下

?
1 2 3 4 5 host: localhost user: www-data password: www-data db:awesome table_name: users

  创建表名的sql语句如下,需要在数据库中创建好对应的表

?
1 2 3 4 5 6 7 8 9 10 11 12 CREATE TABLE `users` (   `id` varchar(50) NOT NULL,   `email` varchar(50) NOT NULL,   `passwd` varchar(50) NOT NULL,   `admin` tinyint(1) NOT NULL,   `name` varchar(50) NOT NULL,   `image` varchar(500) NOT NULL,   `created_at` double NOT NULL,   PRIMARY KEY (`id`),   UNIQUE KEY `idx_email` (`email`),   KEY `idx_created_at` (`created_at`) ) ENGINE=InnoDB DEFAULT CHARSET=utf8

  创建好的表对应的结构如下

?
1 2 3 4 5 6 7 8 9 10 11 12 13 mysql> desc users; +------------+--------------+------+-----+---------+-------+ | Field      | Type         | Null | Key | Default | Extra | +------------+--------------+------+-----+---------+-------+ | id         | varchar(50)  | NO   | PRI | NULL    |       | | email      | varchar(50)  | NO   | UNI | NULL    |       | | passwd     | varchar(50)  | NO   |     | NULL    |       | | admin      | tinyint(1)   | NO   |     | NULL    |       | | name       | varchar(50)  | NO   |     | NULL    |       | | image      | varchar(500) | NO   |     | NULL    |       | | created_at | double       | NO   | MUL | NULL    |       | +------------+--------------+------+-----+---------+-------+ 7 rows in set (2.68 sec)

  ①执行insert操作

?
1 2 3 4 5 6 7 8 # insert start import time sql = 'insert into `users` (`email`, `passwd`, `admin`, `name`, `image`, `created_at`, `id`) values (?, ?, ?, ?, ?, ?, ?)' args = ['test@qq.com','password',1,'test','about:blank',time.time(),'111111'] async def insert():     await execute(sql,args) loop.run_until_complete(insert()) # insert end

  执行方式和pymysql没有区别,不同的是需要在事件循环中使用关键字await调用

  执行完毕在MySQL中查看插入的数据

?
1 2 3 4 5 6 7 mysql> select * from users; +--------+-------------+----------+-------+------+-------------+------------------+ | id     | email       | passwd   | admin | name | image       | created_at       | +--------+-------------+----------+-------+------+-------------+------------------+ | 111111 | test@qq.com | password |     1 | test | about:blank | 1637738541.48629 | +--------+-------------+----------+-------+------+-------------+------------------+ 1 row in set (0.00 sec)

  ②执行update操作

  直接在loop事件循环中执行execute协程函数也可以

?
1 2 3 4 5 6 # update start import time sql = 'update `users` set `email`=?, `passwd`=?, `admin`=?, `name`=?, `image`=?, `created_at`=? where `id`=?' args = ['test2@qq.com','password',1,'test2','about:blank',time.time(),'111111'] loop.run_until_complete(execute(sql,args)) # update end

  执行以后把email和name都修改了

  ③执行delete操作

?
1 2 3 4 5 # delete start sql = 'delete from `users` where `id`=?' args = ['111111'] loop.run_until_complete(execute(sql,args)) # delete end

  同样根据关键字id指定的值删除了这条数据

  ④执行selete操作

  在执行select操作前我们保证数据库里面至少有一条数据

?
1 2 3 4 5 6 # select start sql = 'select * from users' args = [] rs = loop.run_until_complete(select(sql,args)) print(rs) # select end

  这里直接执行搜索的协程函数select根据函数的定义返回的是所有结果的list,元素是查询结果的字典

  输出为

?
1 [{'id': '111111', 'email': 'test@qq.com', 'passwd': 'password', 'admin': 1, 'name': 'test', 'image': 'about:blank', 'created_at': 1637739212.74493}]

  如果结果有多个则使用list的下标取出

  

  补充

  同步模块pymysql和异步模块aiomysql执行速度对比

  假如我们需要往数据库插入20000条数据,我们分别使用同步模式和异步模式

  首先删除数据库所有测试数据

?
1 delete from users;

  同步的代码

  d:/learn-python3/学习脚本/pymysql/use_pymysql.py

?
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 70 71 72 73 74 75 76 77 78 79 80 81 82 83 84 85 86 87 88 89 90 91 92 93 94 import pymysql db_config = {     'host': 'localhost',     'user': 'www-data',     'password': 'www-data',     'db': 'awesome' } # 创建连接,相当于把字典内的键值对传递 # 相当于执行pymysql.connect(host='localhost',user='www-data',password='www-data',db='awesome') conn = pymysql.connect(**db_config) # 创建游标 cursor = conn.cursor(pymysql.cursors.DictCursor) sql = 'select * from users' args = [] # 执行查询返回结果数量 # 执行查询 rs=cursor.execute(sql,args) # 获取查询结果 # 获取查询的第一条结果,返回一个dict,dict元素是查询对应的键值对 # 如果查询结果有多条则执行一次,游标移动到下一条数据,在执行一次又返回一条数据 # print(cursor.fetchone()) # print(cursor.fetchone()) # print(cursor.fetchall()) # print(cursor.fetchmany()) # {'id': '111111', 'email': 'test@qq.com', 'passwd': 'password', 'admin': 1, 'name': 'test', 'image': 'about:blank', 'created_at': 1637723578.5734} # 获取查询的所有结果,返回一个list,list元素是dict,dict元素是查询对应的键值对 # print(cursor.fetchall()) # [{'id': '111111', 'email': 'test@qq.com', 'passwd': 'password', 'admin': 1, 'name': 'test', 'image': 'about:blank', 'created_at': 1637723578.5734}] # 获取查询的前几条结果,返回一个dict,dict元素是查询对应的键值对 # print(cursor.fetchmany(1)) # [{'id': '111111', 'email': 'test@qq.com', 'passwd': 'password', 'admin': 1, 'name': 'test', 'image': 'about:blank', 'created_at': 1637723578.5734}] # 执行修改操作 import time # # insert start sql = 'insert into `users` (`email`, `passwd`, `admin`, `name`, `image`, `created_at`, `id`) values (?, ?, ?, ?, ?, ?, ?)' args = ['test1@qq.com','password',1,'test','about:blank',time.time(),'1111121'] # 使用replace 把'?'替换成'%s' # rs = cursor.execute(sql.replace('?','%s'),args) # print(cursor.rowcount) # conn.commit() # print(rs) # insert end   # update start # sql = 'update `users` set `email`=?, `passwd`=?, `admin`=?, `name`=?, `image`=?, `created_at`=? where `id`=?' # args = ['test2@qq.com','password',1,'test2','about:blank',time.time(),'111111'] # print(cursor.execute(sql.replace('?','%s'),args)) # conn.commit() # update end   # delete start # sql = 'delete from `users` where `id`=?' # args = ['111111'] # print(cursor.execute(sql.replace('?','%s'),args)) # conn.commit() # delete end     # 写成函数调用,函数内部使用了数据库连接对象conn # 可以先定义成全局变量global def select(sql,args,size=None):           cursor =  conn.cursor(pymysql.cursors.DictCursor)     cursor.execute(sql.replace('?','%s'),args or ())     if size:         rs = cursor.fetchmany(size)     else:         rs = cursor.fetchall()     cursor.close     # logging.info('rows returned: %s' % len(rs))     return rs    def execute(sql,args):            cursor = conn.cursor(pymysql.cursors.DictCursor)     try:         cursor.execute(sql.replace('?','%s'),args)         # rowcount方法把影响函数返回         rs = cursor.rowcount         cursor.close()         conn.commit()     except:         raise     return rs   start_time = time.time() for n in range(20000):     sql = 'insert into `users` (`email`, `passwd`, `admin`, `name`, `image`, `created_at`, `id`) values (?, ?, ?, ?, ?, ?, ?)'     email = 'test%s@qq.com' %n     args = [email,'password',1,'test','about:blank',time.time(),n]     execute(sql,args) end_time = time.time() # 打印开始和结束时间的差 print(end_time - start_time)

  我们使用一个循环20000次往数据库插入数据

  执行,插入数据比较多需要等待一段时间输出

?
1 2 D:\learn-python3\函数式编程>C:/Python37/python.exe d:/learn-python3/学习脚本/pymysql/use_pymysql.py 77.46903562545776

  可以在数据库查询到这20000条数据,而且这个表的字段created_at存储了创建这条数据的时间戳,我们可以看到,id越往后的时间戳越往后,说明数据是同步按顺序一一插入的

  我们按照字段created_at排序查询

 

 

  下面我们删除所有数据使用异步方式插入

  异步的代码如下

  d:/learn-python3/学习脚本/aiomysql/use_aiomysql.py

?
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 70 71 72 73 74 75 76 77 78 79 80 81 82 83 84 85 86 87 88 89 90 91 92 93 94 95 96 97 98 99 100 101 102 103 104 import asyncio,aiomysql,time # 数据库配置dict db_config = {     'host': 'localhost',     'user': 'www-data',     'password': 'www-data',     'db': 'awesome' }   # 创建数据库连接池协程函数 async def create_pool(**kw):     global __pool     __pool = await aiomysql.create_pool(         host=kw.get('host', 'localhost'),         port=kw.get('port', 3306),         user=kw['user'],         password=kw['password'],         db=kw['db']     )   loop=asyncio.get_event_loop() loop.run_until_complete(create_pool(**db_config)) # 在事件循环中运行了协程函数则生成了全局变量__pool是一个连接池对象 <aiomysql.pool.Pool object at 0x00000244AD1724C8> print(__pool) # <aiomysql.pool.Pool object at 0x00000244AD1724C8>   # 执行select函数 async def select(sql,args,size=None):     with await __pool as conn:         cur = await conn.cursor(aiomysql.DictCursor)         await cur.execute(sql.replace('?','?s'),args or ())         if size:             rs = await cur.fetchmany(size)         else:             rs = await cur.fetchall()         await cur.close()         return rs     # 执行insert update delete函数 async def execute(sql,args):     with await __pool as conn:         try:             cur = await conn.cursor()             await cur.execute(sql.replace('?','%s'),args)             affetced = cur.rowcount             await conn.commit()             await cur.close()         except BaseException as e:             raise         return affetced   # insert start # import time # sql = 'insert into `users` (`email`, `passwd`, `admin`, `name`, `image`, `created_at`, `id`) values (?, ?, ?, ?, ?, ?, ?)' # args = ['test@qq.com','password',1,'test','about:blank',time.time(),'111111'] # async def insert(): #     await execute(sql,args) # loop.run_until_complete(insert()) # insert end   # update start # import time # sql = 'update `users` set `email`=?, `passwd`=?, `admin`=?, `name`=?, `image`=?, `created_at`=? where `id`=?' # args = ['test2@qq.com','password',1,'test2','about:blank',time.time(),'111111'] # loop.run_until_complete(execute(sql,args))   # update end   # delete start # sql = 'delete from `users` where `id`=?' # args = ['111111'] # loop.run_until_complete(execute(sql,args)) # delete end   # select start # sql = 'select * from users' # args = [] # rs = loop.run_until_complete(select(sql,args)) # print(rs) # select end   async def insert1():      for n in range(10000):         sql = 'insert into `users` (`email`, `passwd`, `admin`, `name`, `image`, `created_at`, `id`) values (?, ?, ?, ?, ?, ?, ?)'         email = 'test%s@qq.com' %n         args = [email,'password',1,'test','about:blank',time.time(),n]         await execute(sql,args)   async def insert2():      for n in range(10001,20001):         sql = 'insert into `users` (`email`, `passwd`, `admin`, `name`, `image`, `created_at`, `id`) values (?, ?, ?, ?, ?, ?, ?)'         email = 'test%s@qq.com' %n         args = [email,'password',1,'test','about:blank',time.time(),n]         await execute(sql,args)   async def main():     # 需要组合成一个事件才会异步执行即在执行insert1的时候同步执行insert2     await asyncio.gather(insert1(),insert2())   start_time = time.time() loop.run_until_complete(main()) end_time = time.time() print(end_time - start_time)

  这里我们定义了两个协程函数,分别用来执行前10000个数据和后10000个数据的插入,在main()把这两个协程函数组合成一个事件循环

  等待一段时间后执行输出如下,忽略这个warning,可以看到执行时间明显比同步时间短

?
1 2 3 d:/learn-python3/学习脚本/aiomysql/use_aiomysql.py:42: DeprecationWarning: with await pool as conn deprecated, useasync with pool.acquire() as conn instead   with await __pool as conn: 39.794615507125854

  我们去数据库查询一下数据也可以看到id从0开始和id从10001开始几乎是同时插入的

 

 

 

 

标签:异步,await,args,cursor,sql,time,MySQL,aiomysql,id
来源: https://www.cnblogs.com/xiao-xue-di/p/15853542.html

本站声明: 1. iCode9 技术分享网(下文简称本站)提供的所有内容,仅供技术学习、探讨和分享;
2. 关于本站的所有留言、评论、转载及引用,纯属内容发起人的个人观点,与本站观点和立场无关;
3. 关于本站的所有言论和文字,纯属内容发起人的个人观点,与本站观点和立场无关;
4. 本站文章均是网友提供,不完全保证技术分享内容的完整性、准确性、时效性、风险性和版权归属;如您发现该文章侵犯了您的权益,可联系我们第一时间进行删除;
5. 本站为非盈利性的个人网站,所有内容不会用来进行牟利,也不会利用任何形式的广告来间接获益,纯粹是为了广大技术爱好者提供技术内容和技术思想的分享性交流网站。

专注分享技术,共同学习,共同进步。侵权联系[81616952@qq.com]

Copyright (C)ICode9.com, All Rights Reserved.

ICode9版权所有