并发测试代码学习
2022/9/1 23:26:10
本文主要是介绍并发测试代码学习,对大家解决编程问题具有一定的参考价值,需要的程序猿们随着小编来一起学习吧!
#!/usr/bin/python3 # -*- coding: utf-8 -*- import base64 import os import urllib import numpy as np import requests, time, json, threading, random class Presstest(object): """ 并发压力测试 """ def __init__(self, press_url): self.press_url = press_url def test_interface(self): '''压测接口''' global INDEX INDEX += 1 global ERROR_NUM global TIME_LENS try: start = time.time() self.do_request(self.press_url) end = time.time() TIME_LENS.append(end - start) print('end') except Exception as e: ERROR_NUM += 1 print(e) def test_onework(self): '''一次并发处理单个任务''' i = 0 while i < ONE_WORKER_NUM: i += 1 self.test_interface() time.sleep(LOOP_SLEEP) def do_request(self, url): '''通用http获取webapi请求结果方法''' # headers = { # 'Content-Type': 'application/json; charset=UTF-8', # } # request = urllib.request.Request(url, headers=headers) retry_num = 0 while retry_num < 3: response = urllib.request.urlopen(url, timeout=300) if not response or response.status == 421: time.sleep(1) retry_num = retry_num + 1 continue else: break response_content = response.read() if hasattr(response_content, 'decode'): response_content = response_content.decode('utf-8') return response_content def run(self): '''使用多线程进程并发测试''' t1 = time.time() Threads = [] for i in range(THREAD_NUM): t = threading.Thread(target=self.test_onework, name="T" + str(i)) t.setDaemon(True) Threads.append(t) for t in Threads: t.start() for t in Threads: t.join() t2 = time.time() print("===============压测结果===================") print("URL:", self.press_url) print("任务数量:", THREAD_NUM, "*", ONE_WORKER_NUM, "=", THREAD_NUM * ONE_WORKER_NUM) print("总耗时(秒):", t2 - t1) print("每次请求耗时(秒):", (t2 - t1) / (THREAD_NUM * ONE_WORKER_NUM)) print("每秒承载请求数:", 1 / ((t2 - t1) / (THREAD_NUM * ONE_WORKER_NUM))) print("错误数量:", ERROR_NUM) print(INDEX) if __name__ == '__main__': press_url = 'https://blog.csdn.net/m0_37886429/article/details/89020455' TIME_LENS = [] INDEX = 0 THREAD_NUM = 1 # 并发线程总数 ONE_WORKER_NUM = 5 # 每个线程的循环次数 LOOP_SLEEP = 0 # 每次请求时间间隔(秒) ERROR_NUM = 0 # 出错数 obj = Presstest(press_url) obj.run()
这篇关于并发测试代码学习的文章就介绍到这儿,希望我们推荐的文章对大家有所帮助,也希望大家多多支持为之网!
- 2024-05-09“2024鸿蒙零基础快速实战-仿抖音App开发(ArkTS版)”实战课程已上线
- 2024-05-09聊聊如何通过arthas-tunnel-server来远程管理所有需要arthas监控的应用
- 2024-05-09log4j2这么配就对了
- 2024-05-09nginx修改Content-Type
- 2024-05-09Redis多数据源,看这篇就够了
- 2024-05-09Google Chrome驱动程序 124.0.6367.62(正式版本)去哪下载?
- 2024-05-09有没有大佬知道这种数据应该怎么抓取呀?
- 2024-05-09这种运行结果里的10.100000001,怎么能最快改成10.1?
- 2024-05-09企业src漏洞挖掘-有意思的命令执行
- 2024-05-08阿里云域名注册流程,分享给第一次购买域名的新手站长!