ICode9

精准搜索请尝试: 精确搜索
首页 > 编程语言> 文章详细

thingsboard rpc应用程序控制传感器模拟

2021-02-06 17:05:50  阅读:838  来源: 互联网

标签:me devices v1 rpc client 程序控制 thingsboard data


使用服务器端rpc来控制模拟的温度传感器

效果图如下:
在这里插入图片描述
下面把我的设置和代码记录一下:

增加设备

在这里插入图片描述
如果使用服务器端RPC命令,不需要在增加规则链了

增加控制按钮

在这里插入图片描述
注意这里的methond方法
在这里插入图片描述
getvalue setvalue要与代码中对应的接收方法一致

rpc代码(C#使用mqttnet)

主要的代码是下面这些

  public static void Main(string[] args)
        {
                var factory = new MqttFactory();
                mqttClient = factory.CreateMqttClient();

                //创建tcp options 用于连接mqtt
                var options = new MqttClientOptionsBuilder()
                            //clinet的名称
                            .WithClientId("client3")
                            //mqtt broker的服务器地址IP,默认的是1883端口
                            .WithTcpServer("192.168.137.131", 1883)
                            .WithCredentials("FOvUIh5e7sHD4BWKceb2", "")
                            //hivemq的登录用户名
                            .WithUserProperty("FOvUIh5e7sHD4BWKceb2", "")
                            .WithCleanSession()
                            .Build();
                mqttClient.ConnectAsync(options, CancellationToken.None);


                mqttClient.UseConnectedHandler(e =>
                {
                    Console.WriteLine("连接成功");
                    Console.WriteLine("当前连接线程名称:" + Thread.CurrentThread.ManagedThreadId.ToString());
                    mqttClient.SubscribeAsync("v1/devices/me/rpc/request/+");                   

                });
                mqttClient.UseApplicationMessageReceivedHandler(e =>
                {
                    string topic = e.ApplicationMessage.Topic;
                    Console.WriteLine("当前接受线程名称:" + Thread.CurrentThread.ManagedThreadId.ToString());
                    if (topic.StartsWith("v1/devices/me/rpc/request/"))
                    {
                        string requestId = topic.Substring("v1/devices/me/rpc/request/".Length);
                        JObject data = JObject.Parse(Encoding.UTF8.GetString(e.ApplicationMessage.Payload));
                        if (data.GetValue("method").ToString() == "getValue")
                        {
                            var temp = new JObject { { "temperature", temperature } };

                            var message = new MqttApplicationMessageBuilder()
                                .WithTopic("v1/devices/me/rpc/response/" + requestId)
                                .WithPayload(temp.ToString())
                                .WithExactlyOnceQoS()
                                .WithRetainFlag()
                                .Build();
                            //执行发送数据到tb
                            Console.WriteLine(DateTime.Now + "发送数据到tb");

                            mqttClient.PublishAsync(message, CancellationToken.None);

                        }
                        else if (data.GetValue("method").ToString() == "setValue")
                        {
                            string param = data.GetValue("params").ToString();

                            setValue(param);
                        }
                    }
                });
            Console.ReadLine();
        }

其中下面这些就是解析rpc命令的

if (topic.StartsWith("v1/devices/me/rpc/request/"))
                    {
                        string requestId = topic.Substring("v1/devices/me/rpc/request/".Length);
                        JObject data = JObject.Parse(Encoding.UTF8.GetString(e.ApplicationMessage.Payload));
                        if (data.GetValue("method").ToString() == "getValue")
                        {
                            var temp = new JObject { { "temperature", temperature } };

                            var message = new MqttApplicationMessageBuilder()
                                .WithTopic("v1/devices/me/rpc/response/" + requestId)
                                .WithPayload(temp.ToString())
                                .WithExactlyOnceQoS()
                                .WithRetainFlag()
                                .Build();
                            //执行发送数据到tb
                            Console.WriteLine(DateTime.Now + "发送数据到tb");

                            mqttClient.PublishAsync(message, CancellationToken.None);

                        }
                        else if (data.GetValue("method").ToString() == "setValue")
                        {
                            string param = data.GetValue("params").ToString();

                            setValue(param);
                        }
                    }
                });

PS:
这里没有实现将温控器设置的温度值回传到tb中,我感觉是线程的问题,因为我测试郭增加线程来处理数据回传就会导致不能获取到温控器的数据,这两个线程不是一个。
这里的问题留待后面研究。

python 版的mqtt

python版本的mqtt可以实现线程间的通信及实时监控,这里也分享给大家

# -*- coding: utf-8 -*-
"""
Created on Sat Feb  6 14:10:01 2021

"""

# This Program illustrates the Server Side RPC on ThingsBoard IoT Platform
# Paste your ThingsBoard IoT Platform IP and Device access token
# Temperature_Controller_Server_Side_RPC.py : This program illustrates Server side RPC using a Simulated Temperature Controller
import os
import time
import sys
import json
import random
import paho.mqtt.client as mqtt
from threading import Thread

# Thingsboard platform credentials
THINGSBOARD_HOST = '192.168.137.131'       #Change IP Address
ACCESS_TOKEN = 'FOvUIh5e7sHD4BWKceb2'
sensor_data = {'temperature': 25}

def publishValue(client):
    INTERVAL = 2
    print("Thread  Started")
    next_reading = time.time()
    while True:
        client.publish('v1/devices/me/telemetry', json.dumps(sensor_data),1)
        next_reading += INTERVAL
        sleep_time = next_reading - time.time()
        if sleep_time > 0:
            time.sleep(sleep_time)

def read_temperature():
    temp = sensor_data['temperature']
    return temp

# Function will set the temperature value in device
def setValue (params):
    sensor_data['temperature'] = params
    #print("Rx setValue is : ",sensor_data)
    print("Temperature Set : ",params,"C")

# MQTT on_connect callback function
def on_connect(client, userdata, flags, rc):
    print("连接成功")
    #print("rc code:", rc)
    client.subscribe('v1/devices/me/rpc/request/+')

# MQTT on_message callback function
def on_message(client, userdata, msg):
    print('Topic: ' + msg.topic + '\nMessage: ' + str(msg.payload))
    if msg.topic.startswith('v1/devices/me/rpc/request/'):
        requestId = msg.topic[len('v1/devices/me/rpc/request/'):len(msg.topic)]
        #print("requestId : ", requestId)
        data = json.loads(msg.payload)
        if data['method'] == 'getValue':
            #print("getvalue request\n")
            #print("sent getValue : ", sensor_data)
            client.publish('v1/devices/me/rpc/response/' + requestId, json.dumps(sensor_data['temperature']), 1)
        if data['method'] == 'setValue':
            #print("setvalue request\n")
            params = data['params']
            setValue(params)

# create a client instance
client = mqtt.Client()
client.on_connect = on_connect
client.on_message = on_message
client.username_pw_set(ACCESS_TOKEN)
client.connect(THINGSBOARD_HOST,1883,60)

t = Thread(target=publishValue, args=(client,))


try:
    client.loop_start()
    t.start()
    while True:
        pass

except KeyboardInterrupt:
    client.disconnect()

标签:me,devices,v1,rpc,client,程序控制,thingsboard,data
来源: https://blog.csdn.net/yuyanchuan/article/details/113727664

本站声明: 1. iCode9 技术分享网(下文简称本站)提供的所有内容,仅供技术学习、探讨和分享;
2. 关于本站的所有留言、评论、转载及引用,纯属内容发起人的个人观点,与本站观点和立场无关;
3. 关于本站的所有言论和文字,纯属内容发起人的个人观点,与本站观点和立场无关;
4. 本站文章均是网友提供,不完全保证技术分享内容的完整性、准确性、时效性、风险性和版权归属;如您发现该文章侵犯了您的权益,可联系我们第一时间进行删除;
5. 本站为非盈利性的个人网站,所有内容不会用来进行牟利,也不会利用任何形式的广告来间接获益,纯粹是为了广大技术爱好者提供技术内容和技术思想的分享性交流网站。

专注分享技术,共同学习,共同进步。侵权联系[81616952@qq.com]

Copyright (C)ICode9.com, All Rights Reserved.

ICode9版权所有