ICode9

精准搜索请尝试: 精确搜索
首页 > 其他分享> 文章详细

Flask+Vue+Echarts+Mysql+websocket 实战(四)

2022-10-31 13:41:18  阅读:219  来源: 互联网

标签:Flask vue echarts mysql WebSocket 实战


一、后台编写

思路:本质是数据的展示,因此只是调用数据库查询方法给前端使用即可。由于物联网水质监测仪目前还未调试好,因此模拟数据采集到数据库,做一个定时器做数据插入的功能(预计本周可以调试完毕,到时候传感器会定时采集数据传入服务器数据库,和此效果相同,先预留读取接口),然后websocket保持对服务器的读取,查询到数据返回给前端。

db_operate.py

import sys
sys.path.append(r"E:pycharm2020projectsplatform1.0")
import json
import time
import random

from flask import Flask
from flask_sqlalchemy import SQLAlchemy
import pymysql
import threading

from util.ecodings import class_to_dict, Decimal_and_DateEncoder
from util.settings import DevelopmentConfig

pymysql.install_as_MySQLdb()

app = Flask(__name__)
# 读取配置,包含数据库配置
app.config.from_object(DevelopmentConfig)

# 创建数据库sqlalchemy工具对象
db = SQLAlchemy(app)


# Flask从数据库已有表自动生成
# model flask-sqlacodegen "mysql+pymysql://root:root@120.78.94.58:3306/monitor_sys_data" --tables
# monitor_data --outfile "model.py" --flask
class MonitorDatum(db.Model):
    __tablename__ = monitor_data

    id = db.Column(db.Integer, primary_key=True)
    date_time = db.Column(db.DateTime)
    water_type = db.Column(db.Float(asdecimal=True))
    device_id = db.Column(db.Float(asdecimal=True))
    temperature = db.Column(db.Float(asdecimal=True))
    ph = db.Column(db.Float(asdecimal=True))
    solinity = db.Column(db.Float(asdecimal=True))
    dissolved_oxygen = db.Column(db.Float(asdecimal=True))
    light = db.Column(db.String(6))
    velocity = db.Column(db.String(6))
    data_type = db.Column(db.Float(asdecimal=True))


def insert():
    print("定时器启动了")
    print(threading.current_thread())  # 查看当前线程
    record_t = time.strftime("%Y-%m-%d %H:%M:%S", time.localtime())
    temp = round(random.uniform(15, 25), 2)  # 产生一个0-40之间的数字,保留两位小数
    ph = round(random.uniform(7, 8), 3)
    sol = round(random.uniform(100, 150), 1)
    dod = round(random.uniform(5, 8), 2)
    # 预留光照,如果是传入为字符串,需要将小数转为字符串,再插入数据库
    ins = MonitorDatum(date_time=record_t, temperature=temp, ph=ph, solinity=sol, dissolved_oxygen=dod, light=30)
    db.session.add(ins)
    db.session.commit()
    print(插入成功!)
    timer = threading.Timer(5, insert)  # 在insert函数结束之前我再开启一个定时器
    timer.start()


def create():
    # 创建所有表
    db.create_all()


def drop():
    # 删除所有表
    db.drop_all()


def query():
    # 清空缓存
    db.session.commit()
    # 查询最近一条数据
    # 只有最后加.all()才能读到实例,order_by和limit是条件查询
    new = db.session.query(MonitorDatum).order_by(MonitorDatum.id.desc()).limit(1).all()
    # [{temperature: 23.18, id: 5, record_t: datetime.datetime(2022, 10, 8, 10, 41, 35)}]  list
    result = class_to_dict(new)
    # 取的时间json.dumps无法对字典中的datetime时间格式数据进行转化。因此需要添加特殊日期格式转化
    result[0] = json.loads(json.dumps(result[0], cls=Decimal_and_DateEncoder))
    # print(result[0])  # {temperature: 23.18, id: 5, record_t: "2022-10-08 10:41:35"}
    # 由于数据库中光照的类型只能为字符型"light": 30,在这里需要把光照转为int型,才能传给前端显示
    result[0][light] = int(result[0][light])
    return result[0]
    # {"temperature": 23.72, "id": 16, "water_type": null, "solinity": 102.1, "light": 30,
    # "data_type": null, "device_id": null, "date_time": "2022-10-17 00:12:37", "ph": 7.731,
    # "dissolved_oxygen": 6.97, "velocity": null}  返回是一个字典


if __name__ == __main__:
    # res = query()
    # print(res)
    # # print(type(res))
    # print(res[date_time])
    # print(type(res[date_time]))
    # print(res[temperature])
    # print(type(res[temperature]))
    # 创建一个定时器,在程序运行在之后我开启一个insert函数
    t1 = threading.Timer(5, function=insert)  # 第一个参数是时间,例:过5s之后我执行后面的一个函数,开启一个线程
    t1.start()
# 控制台运行,5s定时向数据库插入
python db_operate.py

app_db_data.py

import sys

sys.path.append(r"E:pycharm2020projectsplatform1.0")
from db_manage.db_operate import query
from threading import Lock
from flask import Flask, render_template
from flask_socketio import SocketIO
from flask_cors import CORS

app = Flask(__name__)
CORS(app)  # 跨域问题
app.config[SECRET_KEY] = secret!
socketio = SocketIO()
socketio.init_app(app, async_mode=None, cors_allowed_origins=*)
thread = None
thread_lock = Lock()


# 后台线程 产生数据,即刻推送至前端
def background_thread():
    count = 0
    while True:
        socketio.sleep(5)
        res = query()
        # 1个时间,5个水质参数
        record_t = res[date_time]
        temperature = res[temperature]
        ph = res[ph]
        sol = res[solinity]
        dod = res[dissolved_oxygen]
        light = res[light]
        socketio.emit(server_response,
                      {
          
   data: [record_t, temperature, ph, sol, dod, light]},
                      namespace=/test)


@socketio.on(conn, namespace=/test)
def test_connect():
    print(触发)
    global thread
    with thread_lock:
        if thread is None:
            thread = socketio.start_background_task(target=background_thread)


if __name__ == __main__:
    socketio.run(app, host=127.0.0.1, port=5000, debug=True)
# 控制台运行,服务器此时会不断地向前端推送数据(1个时间,5个水质参数)
python app_db_data.py

这是程序定时插入的数据库数据,后面开始光照强度是固定“30”,其余的都不需要。

二、前端编写

1.总体架构

vue就不介绍了,之前的文章都有写过,这里挑重要点写: 首先创建好vue项目后,登录啥的我也不写了,直接写Home.vue(主页),包含header(顶部栏)、sidebar(侧边栏)和content(主要内容)3个组件,顶部栏和侧边栏都是固定的,变化的是content,项目结构如下: 本demo只写主界面中的基地总览和水质检测两部分,前端设计的还很烂,慢慢完善吧。

标签:Flask,vue,echarts,mysql,WebSocket,实战
来源:

本站声明: 1. iCode9 技术分享网(下文简称本站)提供的所有内容,仅供技术学习、探讨和分享;
2. 关于本站的所有留言、评论、转载及引用,纯属内容发起人的个人观点,与本站观点和立场无关;
3. 关于本站的所有言论和文字,纯属内容发起人的个人观点,与本站观点和立场无关;
4. 本站文章均是网友提供,不完全保证技术分享内容的完整性、准确性、时效性、风险性和版权归属;如您发现该文章侵犯了您的权益,可联系我们第一时间进行删除;
5. 本站为非盈利性的个人网站,所有内容不会用来进行牟利,也不会利用任何形式的广告来间接获益,纯粹是为了广大技术爱好者提供技术内容和技术思想的分享性交流网站。

专注分享技术,共同学习,共同进步。侵权联系[81616952@qq.com]

Copyright (C)ICode9.com, All Rights Reserved.

ICode9版权所有