ICode9

精准搜索请尝试: 精确搜索
首页 > 其他分享> 文章详细

Locust如何测试物联网MQTT

2021-12-14 09:35:15  阅读:210  来源: 互联网

标签:python Locust mqtt 联网 topic MQTT client connect msg


MQTT是干什么的
简单来说,它是物联网的通信协议,是消息通道建立,消息发送和消息订阅的标准。如果大家想了解更多概念上的详细可以网上搜索。
Locust测试MQTT的步骤
测试步骤,可以用以下图形表示:
在这里插入图片描述

准备环境,安装Locust测试环境
这个比较简单,主要是准备好Python的虚拟开发环境,并安装好locust的python软件包。可以在Locust官方网站找到相关步骤,这里不再赘述。
安装MQTT客户端库 paho-mqtt
可以在Python虚拟环境,执行如下安装命令:
pip install paho-mqtt

locustfile中首先实现消息发送
这里为了简单起见,使用公共的MQTT broker: EMQX, 它的地址为:"
broker.emqx.io"
具体实现的代码如下:


```python
broker_add = 'broker.emqx.io'
port = 1883
topic = "/python/mqtt_topic_for_python"

client_id = f"python-mqtt-{random.randint(0,100)}"

def connect_mqtt():
    def on_connect(client,userdata,flags,rc):
        if rc == 0:
            print("Connected to MQTT Broker!")
        else:
            print("Failed to connect, return code %d\n",rc)

    client = mqtt_c.Client(client_id)
    client.on_connect= on_connect
    client.connect(broker_add,port)

    return client

def publish(client):
    msg_count = 0
    while True:
        time.sleep(3)
        msg = f"message: {msg_count}"
        result = client.publish(topic,msg)
        status = result[0]
        if status == 0:
            print(f"send `{msg}` to topic `{topic}` ")
        else:
            print(f"Failed to send message to topic {topic}")
        msg_count += 1

def run():
    client = connect_mqtt()
    client.loop_start()
    publish(client)

完善locustfile中关于Taskset和User的相关配置
具体如下代码,所以会发现,用Python-Locust去测试非HTTP协议的应用系统还是比较方便的,代码即测试。

class TheTaskSet(TaskSet):
    @task
    def task_1(self):
        run()


class TheUser(User):
    tasks = [TheTaskSet]
    wait_time = constant_pacing(1)

完整的locustfile如下:

import random,time

from paho.mqtt import client as mqtt_c
from locust import TaskSet,task,User,constant_pacing

broker_add = 'broker.emqx.io'
port = 1883
topic = "/python/mqtt_topic_for_python"

client_id = f"python-mqtt-{random.randint(0,100)}"

def connect_mqtt():
    def on_connect(client,userdata,flags,rc):
        if rc == 0:
            print("Connected to MQTT Broker!")
        else:
            print("Failed to connect, return code %d\n",rc)

    client = mqtt_c.Client(client_id)
    client.on_connect= on_connect
    client.connect(broker_add,port)

    return client

def publish(client):
    msg_count = 0
    while True:
        time.sleep(3)
        msg = f"message: {msg_count}"
        result = client.publish(topic,msg)
        status = result[0]
        if status == 0:
            print(f"send `{msg}` to topic `{topic}` ")
        else:
            print(f"Failed to send message to topic {topic}")
        msg_count += 1

def run():
    client = connect_mqtt()
    client.loop_start()
    publish(client)


class TheTaskSet(TaskSet):
    @task
    def task_1(self):
        run()


class TheUser(User):
    tasks = [TheTaskSet]
    wait_time = constant_pacing(1)

具体的执行结果如下:
Connected to MQTT Broker!
send message: 0 to topic /python/mqtt_topic_for_python
send message: 1 to topic /python/mqtt_topic_for_python
send message: 2 to topic /python/mqtt_topic_for_python
send message: 3 to topic /python/mqtt_topic_for_python
send message: 4 to topic /python/mqtt_topic_for_python
send message: 5 to topic /python/mqtt_topic_for_python

参考文档:EMQ官方文档,https://www.emqx.com/zh/blog/how-to-use-mqtt-in-python

标签:python,Locust,mqtt,联网,topic,MQTT,client,connect,msg
来源: https://blog.csdn.net/qq_43305605/article/details/121919597

本站声明: 1. iCode9 技术分享网(下文简称本站)提供的所有内容,仅供技术学习、探讨和分享;
2. 关于本站的所有留言、评论、转载及引用,纯属内容发起人的个人观点,与本站观点和立场无关;
3. 关于本站的所有言论和文字,纯属内容发起人的个人观点,与本站观点和立场无关;
4. 本站文章均是网友提供,不完全保证技术分享内容的完整性、准确性、时效性、风险性和版权归属;如您发现该文章侵犯了您的权益,可联系我们第一时间进行删除;
5. 本站为非盈利性的个人网站,所有内容不会用来进行牟利,也不会利用任何形式的广告来间接获益,纯粹是为了广大技术爱好者提供技术内容和技术思想的分享性交流网站。

专注分享技术,共同学习,共同进步。侵权联系[81616952@qq.com]

Copyright (C)ICode9.com, All Rights Reserved.

ICode9版权所有