ICode9

精准搜索请尝试: 精确搜索
首页 > 其他分享> 文章详细

并发测试代码学习

2022-09-01 20:03:46  阅读:152  来源: 互联网

标签:url self print 学习 并发 NUM time 测试代码 response


#!/usr/bin/python3  
# -*- coding: utf-8 -*-

import base64
import os
import urllib
import numpy as np
import requests, time, json, threading, random


class Presstest(object):
"""
并发压力测试
"""

def __init__(self, press_url):
self.press_url = press_url

def test_interface(self):
'''压测接口'''
global INDEX
INDEX += 1

global ERROR_NUM
global TIME_LENS
try:
start = time.time()
self.do_request(self.press_url)
end = time.time()
TIME_LENS.append(end - start)
print('end')
except Exception as e:
ERROR_NUM += 1
print(e)

def test_onework(self):
'''一次并发处理单个任务'''
i = 0
while i < ONE_WORKER_NUM:
i += 1
self.test_interface()
time.sleep(LOOP_SLEEP)

def do_request(self, url):
'''通用http获取webapi请求结果方法'''
# headers = {
# 'Content-Type': 'application/json; charset=UTF-8',
# }
# request = urllib.request.Request(url, headers=headers)
retry_num = 0
while retry_num < 3:
response = urllib.request.urlopen(url, timeout=300)
if not response or response.status == 421:
time.sleep(1)
retry_num = retry_num + 1
continue
else:
break
response_content = response.read()
if hasattr(response_content, 'decode'):
response_content = response_content.decode('utf-8')
return response_content

def run(self):
'''使用多线程进程并发测试'''
t1 = time.time()
Threads = []

for i in range(THREAD_NUM):
t = threading.Thread(target=self.test_onework, name="T" + str(i))
t.setDaemon(True)
Threads.append(t)

for t in Threads:
t.start()
for t in Threads:
t.join()
t2 = time.time()

print("===============压测结果===================")
print("URL:", self.press_url)
print("任务数量:", THREAD_NUM, "*", ONE_WORKER_NUM, "=", THREAD_NUM * ONE_WORKER_NUM)
print("总耗时(秒):", t2 - t1)
print("每次请求耗时(秒):", (t2 - t1) / (THREAD_NUM * ONE_WORKER_NUM))
print("每秒承载请求数:", 1 / ((t2 - t1) / (THREAD_NUM * ONE_WORKER_NUM)))
print("错误数量:", ERROR_NUM)
print(INDEX)


if __name__ == '__main__':
press_url = 'https://blog.csdn.net/m0_37886429/article/details/89020455'
TIME_LENS = []
INDEX = 0
THREAD_NUM = 1 # 并发线程总数
ONE_WORKER_NUM = 5 # 每个线程的循环次数
LOOP_SLEEP = 0 # 每次请求时间间隔(秒)
ERROR_NUM = 0 # 出错数

obj = Presstest(press_url)
obj.run()

标签:url,self,print,学习,并发,NUM,time,测试代码,response
来源: https://www.cnblogs.com/tuzichun/p/16647644.html

本站声明: 1. iCode9 技术分享网(下文简称本站)提供的所有内容,仅供技术学习、探讨和分享;
2. 关于本站的所有留言、评论、转载及引用,纯属内容发起人的个人观点,与本站观点和立场无关;
3. 关于本站的所有言论和文字,纯属内容发起人的个人观点,与本站观点和立场无关;
4. 本站文章均是网友提供,不完全保证技术分享内容的完整性、准确性、时效性、风险性和版权归属;如您发现该文章侵犯了您的权益,可联系我们第一时间进行删除;
5. 本站为非盈利性的个人网站,所有内容不会用来进行牟利,也不会利用任何形式的广告来间接获益,纯粹是为了广大技术爱好者提供技术内容和技术思想的分享性交流网站。

专注分享技术,共同学习,共同进步。侵权联系[81616952@qq.com]

Copyright (C)ICode9.com, All Rights Reserved.

ICode9版权所有