Python实现自动化压力测试脚本

不管怎样,生活还是要继续向前走去。有的时候伤害和失败不见得是一件坏事,它会让你变得更好,孤单和失落亦是如此。每件事到最后一定会变成一件好事,只要你能够走到最后。纸上得来终觉浅,绝知此事要躬行。

需求说明

所谓的自动化压力测试脚本,就是同时访问一个请求,看服务器是否能正确响应和正常返回数据。抗压能力越强,说明服务器抗风险越强。

  • 1, 实现同时发送多个请求到一个网站,获取响应代码。
  • 2, 持续一定时间发送。

代码展示

import sys
import time
import thread
import httplib, urllib
import random
import uuid
import logging
logging.basicConfig(level=logging.DEBUG,
                format='%(asctime)s %(filename)s[line:%(lineno)d] %(levelname)s %(message)s',
                datefmt='%a, %d %b %Y %H:%M:%S',
                filename='测试脚本日志.log',
                filemode='w')
 
def log_uncaught_exceptions(exception_type, exception, tb):
    logging.critical(''.join(traceback.format_tb(tb)))
    logging.critical('{0}: {1}'.format(exception_type, exception))
sys.excepthook = log_uncaught_exceptions
 
#网关地址
addr="172.18.2.4"
port=8080
thread_count = 15 #单次并发数量
requst_interval = 10 #请求间隔(秒)
test_count = sys.maxsize #sys.maxsize  # 指定测试次数
 
 
#字段说明,必须一一对应
#login为空表示使用随机用户名
 
param_list=[
{"login":"user1","password":"qweqwe12"},
]
 
now_count = 0
lock_obj = thread.allocate()
def send_http():
    global now_count
    httpClient = None
    try:
        for user in user_list:
            tmp_user = user["login"]
            if tmp_user.strip() =='':
                tmp_user = str(uuid.uuid1()) + str(random.random())
            print tmp_user
            params = urllib.urlencode({"operationData":
                        [{"login": tmp_user,"password":user["password"]}]})
            headers = {"Content-type": "application/x-www-form-urlencoded", "Accept": "text/plain"}
 
            httpClient = httplib.HTTPConnection(addr, port, timeout=5)
            httpClient.request("POST", "/simple/spider.task.distribute", params, headers)
 
            response = httpClient.getresponse()
            print '发送数据: ' + params
            print '返回码: ' + str(response.status)
            print '返回数据: ' + response.read()
 
            logging.info('发送数据: ' + params)
            logging.info('返回码: ' + str(response.status))
            logging.info('返回数据: ' + response.read())
            #print response.getheaders() #获取头信息
            sys.stdout.flush()
            now_count+=1
    except Exception, e:
        print e
        logging.info(e)
    finally:
        if httpClient:
            httpClient.close()
 
def test_func(run_count):
    global now_count
    global requst_interval
    global lock_obj
    cnt = 0
    while cnt < run_count:
        lock_obj.acquire()
        print ''
        print '***************************请求次数:' + str(now_count) + '*******************************'
        print 'Thread:(%d) Time:%s\n'%(thread.get_ident(), time.ctime())
 
        logging.info(' ')
        logging.info('***************************请求次数:' + str(now_count) + '*******************************')
        logging.info('Thread:(%d) Time:%s\n'%(thread.get_ident(), time.ctime()))
        cnt+=1
        send_http()
        sys.stdout.flush()
        lock_obj.release()
        time.sleep(requst_interval)
 
def test(ct):
    global thread_count
    for i in range(thread_count):
        thread.start_new_thread(test_func,(ct,))
 
if __name__=='__main__':
    global test_count
    test(test_count)
    while True:
        time.sleep(100)

以上就是Python实现自动化压力测试脚本。爱情无需刻意去把握,越是想抓牢自己的爱情,反而容易失去自我。更多关于Python实现自动化压力测试脚本请关注haodaima.com其它相关文章!

您可能有感兴趣的文章
python 实现手机自动拨打电话的方法(通话压力测试)

python多线程http压力测试脚本

python实现websocket的客户端压力测试

python+ffmpeg视频并发直播压力测试

Python实现的多线程http压力测试代码