当前位置 博文首页 > 疯狂的机器人:Locust1.4.3版本性能测试工具案例分享

    疯狂的机器人:Locust1.4.3版本性能测试工具案例分享

    作者:疯狂的机器人 时间:2021-02-08 18:27

    一、Locust工具介绍

    1.概述

    Locust是一款易于使用的分布式负载测试工具,完全基于事件,使用python开发,即一个locust节点也可以在一个进程中支持数千并发用户,不使用回调,通过gevent使用轻量级过程(即在自己的进程内运行)。

     

    2.常见性能测试工具比较

     

    3.环境搭建

    Python3.7.9

    源码安装:下载源码https://github.com/locustio/locust,进入文件夹执行安装命令:python setup.py install

    pip安装:pip install locust==1.4.3

     

    二、Locust常用类和方法

     

    三、Locust常用参

     

    四、案例

    1.参数化

    # -*- coding: utf-8 -*-
    import os,random
    from locust import  TaskSet, task,HttpUser, between
    
    #任务类
    class Testlocust(TaskSet):
        def on_start(self):
            self.login_headers={'x-client-id': 'xxxxxx'}          #头文件
            self.data=[
                {
                    "account_name": "登录账号1",
                    "user_name": "登录账号1",
                    "hashed_password": "登录密码1"
                },
                {
                    "account_name": "登录账号2",
                    "user_name": "登录账号2",
                    "hashed_password": "登录密码2"
                },
                {
                    "account_name": "登录账号3",
                    "user_name": "登录账号3",
                    "hashed_password": "登录密码3"
                }
            ]                                               #登录账号密码
            print("------on start------")
    
        @task()
        def userlogin(self):
            r = self.client.post(url='/v1/auth/users/login', headers=self.login_headers,json=random.choice(self.data),name='登录',verify=False)  #使用choice方法参数化随机登录
            assert r.status_code == 200 and r.json()['status']==0                                #登录断言
    
        def on_stop(self):
            print("------on stop------")
    
    #用户类
    class WebsiteUser(HttpUser):                        #locust1.0版本以前是HttpLocust
        tasks = [Testlocust]                            #locust1.0版本以前是task_set=Testlocust
        host = 'https://xxxxxx'                         #被测主机地址
        wait_time = between(min_wait=1,max_wait=5)      #任务之间的等待时间
    
    if __name__ == "__main__":
        os.system("locust -f locust_XX.py")             #执行locust脚本

     

    2.关联

    # -*- coding: utf-8 -*-
    import os,requests
    from locust import  TaskSet, task,HttpUser, between
    
    #任务类
    class Testlocust(TaskSet):
        def on_start(self):
            print("------on start------")
            access_token = self.userlogin()                                         #返回登录token
            self.headers = {}                                                       #定义headers
            self.headers['x-api-key'] = 'fZkQSHC1dp2s0tL21EMtaNX3UjF7P6L9'          #添加headers值
            self.headers['Authorization'] = 'Bearer ' + access_token
            self.headers['x-key-hash'] = '1607675936446;abcdefg;bca1ef2b5835e454a15929f7ce9cb5d7ebaf580377624019002'
            self.headers['Content-Type'] = 'application/json'
            self.params = {"name": "CDR", "page": "1", "size": "10", "series": "CDR80"}    #查询接口参数
    
        def userlogin(self):                                                        #登录方法
            login_url = 'https://xxxxxx/v1/auth/users/login'                        #登录url
            data = {
                "account_name": "账号1",
                "user_name": "账号1",
                "hashed_password": "密码1"
            }                                                                       #登录账号密码
            login_headers = {'x-client-id': '46bf882df2959ea2'}
            r = requests.request('POST', url=login_url, headers=login_headers, json=data)  #登录
            return r.json()['access_token']                                         #返回登录token
    
        @task()
        def search(self):                                                                   #查询设备信息
            r = self.client.get(url='/v1/assets/device/search', headers=self.headers,params=self.params, name='查询', verify=False)              #查询
            assert r.status_code == 200 and r.json()['code'] == 0                           #结果断言
    
    #用户类
    class WebsiteUser(HttpUser):                        #locust1.0版本以前是HttpLocust
        tasks = [Testlocust]                            #locust1.0版本以前是task_set=Testlocust
        host = 'https://xxxxxx.com'                     #被测主机地址
        wait_time = between(min_wait=1,max_wait=5)      #任务之间的等待时间
    
    if __name__ == "__main__":
        os.system("locust -f locust_XX.py")             #执行参数
    bk