ICode9

精准搜索请尝试: 精确搜索
首页 > 其他分享> 文章详细

产品管理服务--连接EMQ X

2021-11-21 10:31:59  阅读:163  来源: 互联网

标签:web hook -- makerknz server ## EMQ import 连接


文章目录

简介

从EMQ X中获取消息的方式比较多

  • 购买EMQ X的企业版,包括很多消息流转模块;

  • 创建个EMQ X超级用户,订阅所有的消息事件;

  • 使用规则引擎;

  • 移植EMQ X的kafaka插件;

  • 通过WebHook插件获取消息。

    image-20210728134815313

本文通过WebHook插件获取消息。设备的所有事件会通过webhook发送到产品服务器。

image-20211121090519234

WebHook介绍

WebHook 是由 emqx_web_hook (opens new window)插件提供的 将 EMQ X 中的钩子事件通知到某个 Web 服务的功能。

可以理解为EMQ X创建了一个客户端,这个客户端可以收集设备的在线、下下线记录、订阅与消息存储、消息送达确认等事件消息,通过钩子上挂载回调函数将事件发送到web器。

    Client      |    EMQ X     |  emqx_web_hook |   HTTP       +------------+
  =============>| - - - - - - -> - - - - - - - ->===========>  | Web Server |
                |    Broker    |                |  Request     +------------+
WebHook消息是单向的。

Webhook配置

web.hook.url

TypeValue
stringhttp://192.168.31.216:9200/emqx/webhook

Webhook 请求转发的目的 Web 服务器地址。

web.hook.headers.

web.hook.headers.content-type = application/json
web.hook.headers.accept = */*
web.hook.headers.webhook-username = makerknz
web.hook.headers.webhook-password = 123456

指定 HTTP 请求头部中的数据。<Key> 指定 HTTP 请求头部中的字段名,此配置项的值为相应的字段值。<Key> 可以是标准的 HTTP 请求头部字段,也可以自定义的字段,可以配置多个不同的请求头部字段。

webhook-username和webhook-password 作为访问后端的凭据,当emq x是一个集群时用来区别不同的服务器。当然认证也可以通过添加证书进行SSL认证,这部分感兴趣的同学可以自己探索。

触发规则

etc/plugins/emqx_web_hooks.conf 可配置触发规则,其配置的格式如下:

## 格式示例
web.hook.rule.<Event>.<Number> = <Rule>

## 示例值
web.hook.rule.message.publish.1 = {"action": "on_message_publish", "topic": "a/b/c"}
web.hook.rule.message.publish.2 = {"action": "on_message_publish", "topic": "foo/#"}
web.hook.rule.message.publish.2 = {"action": "on_message_publish"}

项目中会设置全部转发到后端服务器,不设置topic过滤。

Event 触发事件

目前支持以下事件:

名称说明执行时机
client.connect处理连接报文服务端收到客户端的连接报文时
client.connack下发连接应答服务端准备下发连接应答报文时
client.connected成功接入客户端认证完成并成功接入系统后
client.disconnected连接断开客户端连接层在准备关闭时
client.subscribe订阅主题收到订阅报文后,执行 client.check_acl 鉴权前
client.unsubscribe取消订阅收到取消订阅报文后
session.subscribed会话订阅主题完成订阅操作后
session.unsubscribed会话取消订阅完成取消订阅操作后
message.publish消息发布服务端在发布(路由)消息前
message.delivered消息投递消息准备投递到客户端前
message.acked消息回执服务端在收到客户端发回的消息 ACK 后
message.dropped消息丢弃发布出的消息被丢弃后

Number

同一个事件可以配置多个触发规则,配置相同的事件应当依次递增。

Rule

触发规则,其值为一个 JSON 字符串,其中可用的 Key 有:

  • action:字符串,取固定值
  • topic:字符串,表示一个主题过滤器,操作的主题只有与该主题匹配才能触发事件的转发

例如,我们只将与 a/b/cfoo/# 主题匹配的消息转发到 Web 服务器上,其配置应该为:

web.hook.rule.message.publish.1 = {"action": "on_message_publish", "topic": "a/b/c"}
web.hook.rule.message.publish.2 = {"action": "on_message_publish", "topic": "foo/#"}  

这样 Webhook 仅会转发与 a/b/cfoo/# 主题匹配的消息,例如 foo/bar 等,而不是转发 a/b/dfo/bar

WebFlux 介绍

WebFlux是Spring中的异步非阻塞的响应式web框架,EMQ X的设备有12种事件类型,对于单机安装的EMQ X并发连接11万会产品webhook的请求,后端服务器是同步MVC会导致请求堆积,使服务处理逻辑。

如果通过web.hook.pool_size 配置连接池数,又会导致数据的延迟。

img

MVC JDBC是一个同步请求的过程,每一次请求都会在一个线程内执行,等一次请求完成之后才能释放资源,如果在service方法中业务逻辑如果碰到io操作时间比较长的操作,这样这个service方法就会长时间占用tomcat容器线程池中的线程,这样是不利于其他请求的处理的,当线程池中的线程处理任务时,任务由于长时间io操作,肯定会阻塞线程处理其他任务。

WebFlux R2DBC是一个异步非阻塞的请求过程,此处会涉及大量的EMQX的请求,如果请求堆积,可以通过设置背压调整每次处理的数量来达到对请求的削峰处理。

## WebHook接口设计

增加WebFlux接口

添加依赖包

添加包WebFlux使用的包。r2dbc-mysql用来连接数据库。

    implementation('org.springframework.boot:spring-boot-starter-webflux')
    implementation('org.springframework.boot:spring-boot-starter-data-r2dbc')
    implementation('dev.miku:r2dbc-mysql')

配置

spring:
  r2dbc:
    url: r2dbcs:mysql://www.makerknz.cn:3306/product_manager?characterEncoding=utf-8&useSSL=false
    username: product_manager
    password: 123456

添加repository

WebHook目前设计只对外开放一个接口,仅用到device_events和device两张表,所以只写这两张表的repository即可。

DeviceEventsRepository

package cn.makerknz.product.server.repository;

import cn.makerknz.product.server.entity.DeviceEvents;
import org.springframework.data.repository.reactive.ReactiveCrudRepository;

public interface DeviceEventsRepository extends ReactiveCrudRepository<DeviceEvents, Integer> {
}

DeviceRepository

package cn.makerknz.product.server.repository;

import cn.makerknz.product.server.entity.Device;
import org.springframework.data.repository.reactive.ReactiveCrudRepository;
import reactor.core.publisher.Mono;

public interface DeviceRepository extends ReactiveCrudRepository<Device, Integer> {

    Mono<Device> findByClientId(String clientId);

}

service实现

在DeviceEventsServiceImpl增加实现

package cn.makerknz.product.server.service.impl;

import cn.makerknz.product.server.entity.DeviceEvents;
import cn.makerknz.product.server.mapper.DeviceEventsMapper;
import cn.makerknz.product.server.repository.DeviceEventsRepository;
import cn.makerknz.product.server.service.IDeviceEventsService;
import com.baomidou.mybatisplus.extension.service.impl.ServiceImpl;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.stereotype.Service;
import reactor.core.publisher.Mono;

/**
 * <p>
 *  服务实现类
 * </p>
 *
 * @author maker knz
 * @since 2021-10-27
 */

@Service
public class DeviceEventsServiceImpl extends ServiceImpl<DeviceEventsMapper, DeviceEvents> implements IDeviceEventsService {

    @Autowired
    private DeviceEventsRepository deviceEventsRepository;

    @Override
    public Mono<DeviceEvents> add(DeviceEvents deviceEvents) {
        return deviceEventsRepository.save(deviceEvents);
    }

}

在DeviceServiceImpl中增加实现

package cn.makerknz.product.server.service.impl;

import cn.makerknz.product.server.entity.Device;
import cn.makerknz.product.server.mapper.DeviceMapper;
import cn.makerknz.product.server.repository.DeviceRepository;
import cn.makerknz.product.server.service.IDeviceService;
import com.baomidou.mybatisplus.extension.service.impl.ServiceImpl;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.stereotype.Service;
import reactor.core.publisher.Mono;

/**
 * <p>
 *  服务实现类
 * </p>
 *
 * @author maker knz
 * @since 2021-10-27
 */
@Service
public class DeviceServiceImpl extends ServiceImpl<DeviceMapper, Device> implements IDeviceService {

    @Autowired
    private DeviceRepository deviceRepository;

    @Override
    public Mono<Device> findByClientId(String clientId) {
        return deviceRepository.findByClientId(clientId);
    }
}

对外接口

package cn.makerknz.product.server.controller;

import cn.makerknz.product.server.annotation.CheckEmqxWebhookIdentity;
import cn.makerknz.product.server.domain.enums.ResponseEnum;
import cn.makerknz.product.server.domain.form.EmqxWebhookForm;
import cn.makerknz.product.server.domain.vo.ResultVO;
import cn.makerknz.product.server.entity.DeviceEvents;
import cn.makerknz.product.server.service.IDeviceEventsService;
import cn.makerknz.product.server.service.IDeviceService;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.web.bind.annotation.PostMapping;
import org.springframework.web.bind.annotation.RequestBody;
import org.springframework.web.bind.annotation.RequestMapping;
import org.springframework.web.bind.annotation.RestController;
import reactor.core.publisher.Mono;

import java.util.UUID;

@RestController
@RequestMapping("/emqx")
public class EmqxController {

    @Autowired
    private IDeviceEventsService deviceEventsService;

    @Autowired
    private IDeviceService deviceService;

    @CheckEmqxWebhookIdentity
    @PostMapping("/webhook")
    public Mono<ResultVO<Object>> webhook(@RequestBody EmqxWebhookForm emqxWebhookForm) {

        return deviceService.findByClientId(emqxWebhookForm.getClientid()).flatMap(e -> {
            DeviceEvents deviceEvents = DeviceEvents.builder()
                    .productId(e.getProductId())
                    .data(emqxWebhookForm.toString())
                    .deviceId(e.getId())
                    .eventAction(emqxWebhookForm.getAction())
                    .streamId(UUID.randomUUID().toString())
                    .topic(emqxWebhookForm.getTopic())
                    .dataType(1)
                    .build();
            return deviceEventsService.add(deviceEvents).then(Mono.just(ResultVO.success()));
        }).defaultIfEmpty(ResultVO.error(ResponseEnum.ERROR));
    }

}

Webhook接口权限

原因

webhook接口如果不做权限限制任何人都可以访问,可能会导致数据恶意添加。

配置

对于分布式的EMQ X 每台可以配置不同的账户和密码

emqx:
  webhook:
    users:
      makerknz: 1234567
      makerknz_1: 12313123

读取配置

package cn.makerknz.product.server.config;

import lombok.Data;
import org.springframework.boot.context.properties.ConfigurationProperties;
import org.springframework.boot.context.properties.EnableConfigurationProperties;
import org.springframework.context.annotation.Configuration;

import java.util.Map;

@EnableConfigurationProperties
@Configuration
@ConfigurationProperties(prefix = "emqx.webhook")
@Data
public class EmqxConfig {

    private Map<String,String> users;

}

增加切面注解

package cn.makerknz.product.server.annotation;

import java.lang.annotation.*;

@Documented
@Retention(RetentionPolicy.RUNTIME)
@Target({ ElementType.METHOD })
public @interface CheckEmqxWebhookIdentity {
}

Webhook接口认证实现

package cn.makerknz.product.server.auth;

import cn.makerknz.product.server.config.EmqxConfig;
import org.aspectj.lang.ProceedingJoinPoint;
import org.aspectj.lang.annotation.Around;
import org.aspectj.lang.annotation.Aspect;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.stereotype.Component;
import org.springframework.web.context.request.RequestAttributes;
import org.springframework.web.context.request.RequestContextHolder;
import org.springframework.web.context.request.ServletRequestAttributes;

import javax.servlet.http.HttpServletRequest;

/**
 * @Author: maker_knz
 * @Date: 2021/6/1/001 10:05
 * @Version 1.0
 */

@Aspect
@Component
public class EmqxWebhookIdentityAspect {

    @Autowired
    private EmqxConfig emqxConfig;

    /**
     * 切面验证webhook接口是否可以访问
     * @param point
     * @return
     */
    @Around("@annotation(cn.makerknz.product.server.annotation.CheckEmqxWebhookIdentity)")
    public Object checkLogin(ProceedingJoinPoint point) {

        try {
            HttpServletRequest request = this.getHttpServletRequest();

            // 1.从header中获取username和passowrd
            String username = request.getHeader("webhook-username");
            String password = request.getHeader("webhook-password");

            // 2.验证账户的有效性
            String userPassword = emqxConfig.getUsers().get(username);
            if (!userPassword.equals(password)) {
                throw new SecurityException("没有权限使用WebHook");
            }

            return point.proceed();
        } catch (Throwable throwable) {
            throw new SecurityException("WebHook配置错误");
        }
    }

    /**
     * 从请求头中获取HttpServletRequest
     * @return
     */
    private HttpServletRequest getHttpServletRequest() {
        RequestAttributes requestAttributes = RequestContextHolder.getRequestAttributes();
        ServletRequestAttributes servletRequestAttributes = (ServletRequestAttributes) requestAttributes;
        return servletRequestAttributes.getRequest();
    }

}

Webhook修改配置

docker ps
docker exec -it c46cb71330f1 bash
cd /opt/emqx/etc/plugins
vi vi emqx_web_hook.conf
exit
docker restart c46cb71330f1

修改配置文件为

##====================================================================
## WebHook
##====================================================================

## Webhook URL
##
## Value: String
web.hook.url = http://192.168.31.97:9200/emqx/webhook

## HTTP Headers
##
## Example:
## 1. web.hook.headers.content-type = application/json
## 2. web.hook.headers.accept = *
##
## Value: String
web.hook.headers.content-type = application/json

## 配置接口访问权限,不使用CA证书
web.hook.headers.webhook-username = makerknz
web.hook.headers.webhook-password = 123456

## The encoding format of the payload field in the HTTP body
## The payload field only appears in the on_message_publish and on_message_delivered actions
##
## Value: plain | base64 | base62
web.hook.body.encoding_of_payload_field = plain

##-----------------------------使用https请求---------------------------------------
## PEM format file of CA's
##
## Value: File
## web.hook.ssl.cacertfile  = <PEM format file of CA's>

## Certificate file to use, PEM format assumed
##
## Value: File
## web.hook.ssl.certfile = <Certificate file to use>

## Private key file to use, PEM format assumed
##
## Value: File
## web.hook.ssl.keyfile = <Private key file to use>

## Turn on peer certificate verification
##
## Value: true | false
## web.hook.ssl.verify = false

## If not specified, the server's names returned in server's certificate is validated against
## what's provided `web.hook.url` config's host part.
## Setting to 'disable' will make EMQ X ignore unmatched server names.
## If set with a host name, the server's names returned in server's certificate is validated
## against this value.
##
## Value: String | disable
## web.hook.ssl.server_name_indication = disable

## Connection process pool size
##
## Value: Number
## HTTP 连接进程池大小。
web.hook.pool_size = 32

## Whether to enable HTTP Pipelining
##
## See: https://en.wikipedia.org/wiki/HTTP_pipelining
web.hook.enable_pipelining = true

##--------------------------------------------------------------------
## Hook Rules
## These configuration items represent a list of events should be forwarded
##
## Format:
##   web.hook.rule.<HookName>.<No> = <Spec>
## 有客户端连接时触发
web.hook.rule.client.connect.1       = {"action": "on_client_connect"}
## EMQ X下发连接应答
web.hook.rule.client.connack.1       = {"action": "on_client_connack"}
## 客户端成功接入
web.hook.rule.client.connected.1     = {"action": "on_client_connected"}
## 客户端断开连接
web.hook.rule.client.disconnected.1  = {"action": "on_client_disconnected"}
## 客户端订阅事件
web.hook.rule.client.subscribe.1     = {"action": "on_client_subscribe"}
## 客户端取消订阅事件
web.hook.rule.client.unsubscribe.1   = {"action": "on_client_unsubscribe"}
## EMQ X确认订阅事件
web.hook.rule.session.subscribed.1   = {"action": "on_session_subscribed"}
## EMQ X确认取消订阅事件
web.hook.rule.session.unsubscribed.1 = {"action": "on_session_unsubscribed"}
## 会话终止
web.hook.rule.session.terminated.1   = {"action": "on_session_terminated"}
## 发布消息
web.hook.rule.message.publish.1      = {"action": "on_message_publish"}
## 消息投递成功
web.hook.rule.message.delivered.1    = {"action": "on_message_delivered"}
## 消息已应答
web.hook.rule.message.acked.1        = {"action": "on_message_acked"}

测试

接口测试

权限测试

总结

这里使用WebFlux可以很好的起到消息中间件的作用,这也是为什么不使用kafka作为消息件的原因。如果使用消息订阅的方式,后面对MQTT消息处理是一件复杂的过程。

标签:web,hook,--,makerknz,server,##,EMQ,import,连接
来源: https://blog.csdn.net/maker_knz/article/details/121450161

本站声明: 1. iCode9 技术分享网(下文简称本站)提供的所有内容,仅供技术学习、探讨和分享;
2. 关于本站的所有留言、评论、转载及引用,纯属内容发起人的个人观点,与本站观点和立场无关;
3. 关于本站的所有言论和文字,纯属内容发起人的个人观点,与本站观点和立场无关;
4. 本站文章均是网友提供,不完全保证技术分享内容的完整性、准确性、时效性、风险性和版权归属;如您发现该文章侵犯了您的权益,可联系我们第一时间进行删除;
5. 本站为非盈利性的个人网站,所有内容不会用来进行牟利,也不会利用任何形式的广告来间接获益,纯粹是为了广大技术爱好者提供技术内容和技术思想的分享性交流网站。

专注分享技术,共同学习,共同进步。侵权联系[81616952@qq.com]

Copyright (C)ICode9.com, All Rights Reserved.

ICode9版权所有