ICode9

精准搜索请尝试: 精确搜索
首页 > 其他分享> 文章详细

[蓝点无限] UWB 定位数据融合 之 上位机实现

2021-09-03 08:33:26  阅读:265  来源: 互联网

标签:self 蓝点 上位 tag result UWB serial data location


背景:

在前面两个博文中已经提及到,我们打算做一个UWB 结合运动传感器 融合定位,这篇博文实现上位机代码,上位机使用我们之前开源Python版本TWR上位机,代码可以在末尾论坛链接下载

我们的上位机实现基础是之前的《[开源项目] 蓝点无限 UWB Python版本上位机》,参考链接

https://www.cnblogs.com/tuzhuke/p/15170193.html 

再此基础上将《[蓝点无限] UWB 定位数据融合 之 固件实现

https://www.cnblogs.com/tuzhuke/p/15212574.html 实现完整UWB数据融合项目。

代码全部已经开源,欢迎大家使用并帮忙改进。

 

直接上代码,代码主要是在《[开源项目] 蓝点无限 UWB Python版本上位机》基础上修改,这里列出代码更改部分。

1 解析数据包中的运动变量,并存放到字典中

    result_dict = {'tag': 0x1005, 'acc':0, 'seq': 7, 'time': 1234, 'anthor_count': 4,'anthor': []}

    # 数据包以&&& 开头
    res = re.findall(r'&&&', string)
    flag = 1
    if len(res) > 0:
        # step1 print message length,ex 76
        temp_string = string.split("$")[0]  # &&&:80$
        data_len = int(temp_string.split(":")[1], 16)

        # tag info
        temp_string = string.split("$")[1]  # 000A:20
        tag_id = int(temp_string.split(":")[0], 16)  # 000A
        tag_acc = int(temp_string.split(":")[1], 16)
        tag_seq = int(temp_string.split(":")[2], 16)  # 20
        # print("标签ID: %02X  Seq: %X" % (tag_id, tag_seq))
        result_dict['tag'] = tag_id
        result_dict['acc'] = tag_acc
        result_dict['seq'] = tag_seq

 

2 在返回结果中,将运动信息一并返回给上层

def twr_main(input_string):
    print(input_string)
    error_flag, result_dic = Process_String_Before_Udp(input_string)
    if error_flag == 0:
        [location_result, location_seq, location_addr, location_x, location_y] = Compute_Location(result_dic)
        return location_result, location_seq, location_addr, location_x, location_y
    return 0, 0, 0, 0, 0

 

3 顶层收到定位结果和运动信息,打印结果,并发送给处理函数

                [location_result, location_seq, location_addr, location_x, location_y] = twr_main(msg)
                if location_result == 1:
                    self.data_result.emit(
                        '%d %d %0.2f %0.2f' % (location_seq, location_addr, location_x, location_y))
        # #                 bphero_dispose(str(data))

4 UWB和运动信息进行简单融合,当模块静止,不更新坐标信息

   def insert_result(self, input_str):
        strlist = input_str.split(' ')
        location_addr = int(strlist[1])
        location_x = float(strlist[2])
        location_y = float(strlist[3])
        tag_acc = int(strlist[4])
        print("acc = %d"%tag_acc)
        print("insert result")
        if tag_acc == 0:#只有模块移动的时候更新坐标
            self.Insert_Tag_Result(location_addr,
                                   {"x": location_x, "y": location_y, "z": 0, "qt": QGraphicsEllipseItem(-10, -10, 10, 10)})

 其他代码,上位机增加了串口接收功能

class ComThread(QtCore.QThread):
    data_result = QtCore.pyqtSignal(object)
    data_draf = QtCore.pyqtSignal(object)

    def __init__(self):
        super(ComThread, self).__init__()
        self.l_serial = None
        self.alive = False
        self.waitEnd = None
        self.ID = None
        self.data = None
        self.port = None

    def set_port(self,port):
        self.port = port
        print(self.port)

    def waiting(self):
        if not self.waitEnd is None:
            self.waitEnd.wait()

    def SetStopEvent(self):
        if not self.waitEnd is None:
            self.waitEnd.set()
        self.alive = False
        self.stop()

    def start(self):
        self.l_serial = serial.Serial()
        self.l_serial.port = self.port
        self.l_serial.baudrate = 115200
        self.l_serial.timeout = 2
        self.l_serial.open()
        if self.l_serial.isOpen():
            self.waitEnd = threading.Event()
            self.alive = True
            self.thread_read = None
            self.thread_read = threading.Thread(target=self.FirstReader)
            self.thread_read.setDaemon(1)
            self.thread_read.start()
            return True
        else:
            return False

    def SendDate(self, i_msg, send):
        lmsg = ''
        isOK = False
        if isinstance(i_msg):
            lmsg = i_msg.encode('gb18030')
        else:
            lmsg = i_msg
        try:
            # 发送数据到相应的处理组件
            self.l_serial.write(send)
        except Exception as ex:
            pass;
        return isOK

    def FirstReader(self):
        while self.alive:
            data = ''
            data = data.encode('utf-8')
            n = self.l_serial.inWaiting()
            if n:
                data = self.l_serial.readline()
                print(data)
                msg =str(data, encoding="utf-8")
                self.data_draf.emit(msg)  # for debug only

                [location_result, location_seq, location_addr, location_x, location_y, tag_acc] = twr_main(msg)
                print(tag_acc)
                if location_result == 1:
                    self.data_result.emit(
                        '%d %d %0.2f %0.2f %d' % (location_seq, location_addr, location_x, location_y, tag_acc))
        # #                 bphero_dispose(str(data))

        self.waitEnd.set()
        self.alive = False

    def stop(self):
        self.alive = False
        self.thread_read.join()
        if self.l_serial.isOpen():
            self.l_serial.close()

 

以上完成了整个近期打算开源的工程项目,源码请到www.51uwb.cn 下载 

 

 

 

 

 

 

  

标签:self,蓝点,上位,tag,result,UWB,serial,data,location
来源: https://www.cnblogs.com/tuzhuke/p/15221548.html

本站声明: 1. iCode9 技术分享网(下文简称本站)提供的所有内容,仅供技术学习、探讨和分享;
2. 关于本站的所有留言、评论、转载及引用,纯属内容发起人的个人观点,与本站观点和立场无关;
3. 关于本站的所有言论和文字,纯属内容发起人的个人观点,与本站观点和立场无关;
4. 本站文章均是网友提供,不完全保证技术分享内容的完整性、准确性、时效性、风险性和版权归属;如您发现该文章侵犯了您的权益,可联系我们第一时间进行删除;
5. 本站为非盈利性的个人网站,所有内容不会用来进行牟利,也不会利用任何形式的广告来间接获益,纯粹是为了广大技术爱好者提供技术内容和技术思想的分享性交流网站。

专注分享技术,共同学习,共同进步。侵权联系[81616952@qq.com]

Copyright (C)ICode9.com, All Rights Reserved.

ICode9版权所有