ICode9

精准搜索请尝试: 精确搜索
首页 > 其他分享> 文章详细

2021-04-12

2021-04-12 21:07:23  阅读:157  来源: 互联网

标签:12 自定义 04 rabbitmq 2021 RabbitMQ public rabbitMQSinkProperties sink


Flink 程序Sink(数据输出)操作(5)自定义RabbitMq-Sink

自定义sink需要继承RichSinkFunction

ex:

public static class Demo extends RichSinkFunction<IN> {}

自定义RabbitMQ sink必要依赖

<dependency>
    <groupId>org.apache.flink</groupId>
    <artifactId>flink-connector-rabbitmq_2.12</artifactId>
    <version>1.12.2</version>
</dependency>

如上依赖所示,其实已经是有rabbitMQ的连接器,但是,此连接器只能简单的Queue 模式,我们的业务需求可能不能直接使用Queue模式,比如需要发送到交换机中(Fanout、Driect)等。如果有这种场景呢,我们需要连接器的基础上再自定义RabbitMQ Sink了。

定义配置

# 计算结果输出RabbitMQ配置
sink.rabbitmq.host=10.50.40.116
sink.rabbitmq.port=5673
sink.rabbitmq.username=admin
sink.rabbitmq.password=admin
sink.rabbitmq.exchange=vehicle-alarm-over-speeding

配置对应实体类

我们一会使用代码,读取我们的sink配置为对象,作为参数不断传递

@NoArgsConstructor
@AllArgsConstructor
@Builder
@Data
public class RabbitMqSinkProperties implements Serializable {
    /**
     * rabbitMQ ip
     */
    private String host;
    /**
     * 端口
     */
    private int port;
    /**
     * 用户民
     */
    private String userName;
    /**
     * 密码
     */
    private String passWord;
    /**
     * 交换机名
     */
    private String exchange;


}

定义公共MQSink类继承RichSinkFunction

此类主要作用是作为一个RabbitMQ sink的中间模板,其中定义MQ sink 与RabbitMQ 的连接与关闭,交换机的类型指定等等。

我们某个具体的MQ sink 只需要继承此中间模板类,传输我们之前定义的配置对象,即可快速在invoke中完成对RabbitMQ数据的输出

package com.leilei.sink;

import com.rabbitmq.client.BuiltinExchangeType;
import com.rabbitmq.client.Channel;
import com.rabbitmq.client.Connection;
import com.rabbitmq.client.ConnectionFactory;
import lombok.extern.slf4j.Slf4j;
import org.apache.flink.configuration.Configuration;
import org.apache.flink.streaming.api.functions.sink.RichSinkFunction;

/**
 * @author lei
 * @version 1.0
 * @desc mq 公共模板类 我们我们如果有多个mq-sink  只需继承此类 即可
 * @date 2021-03-15 16:41
 */
@Slf4j
public class DataRichSinkFunction<IN> extends RichSinkFunction<IN> {

    // 配置对象  后续我们在定义具体实体类时用子类触发父类构造调用
    protected final RabbitMqSinkProperties rabbitMQSinkProperties;


    protected Connection connection;

    protected Channel channel;

    public DataRichSinkFunction(RabbitMqSinkProperties rabbitMQSinkProperties) {
        this.rabbitMQSinkProperties = rabbitMQSinkProperties;
    }


    /**
     * open() 方法中建立连接,这样不用每次 invoke 的时候都要建立连接和释放连接
     *
     * @param parameters
     * @throws Exception
     */
    @Override
    public void open(Configuration parameters) throws Exception {
        super.open(parameters);
        // 创建连接工厂
        ConnectionFactory factory = new ConnectionFactory();

        // 设置RabbitMQ相关信息
        factory.setHost(rabbitMQSinkProperties.getHost());
        factory.setUsername(rabbitMQSinkProperties.getUserName());
        factory.setPassword(rabbitMQSinkProperties.getPassWord());
        factory.setPort(rabbitMQSinkProperties.getPort());

        // 创建一个新的连接
        connection = factory.newConnection();

        // 创建一个通道
        channel = connection.createChannel();

        // 声明交换机
        channel.exchangeDeclare(rabbitMQSinkProperties.getExchange(), BuiltinExchangeType.FANOUT, true);
    }

    /**
     * 关闭连接 flink程序从启动到销毁只会执行一次
     * @throws Exception
     */
    @Override
    public void close() throws Exception {
        super.close();
        channel.close();
        connection.close();
    }

}

根据业务定义sink类继承公共MQSink模板

由于公共MQSink模板类中已经对rabbitMq做了一个连接通道的开启和关闭,因此我们当前sink无需关系自身与Mq的连接与关闭,直接在invoke方法中,将数据输出到Mq即可

public class DemoSinkFunction extends DataRichSinkFunction<String> {

    public OverSpeedAlarmSinkFunction(RabbitMqSinkProperties rabbitMQSinkProperties) {
         /**
         * 调用父类(DataRichSinkFunction)构造,完成父类中属性填充
         */
        super(rabbitMQSinkProperties);
    }
    /**
     * 数据输出到 rabbitMQSinkProperties 指定的交换机中
     * @param value
     * @param context
     * @throws Exception
     */
    @Override
    public void invoke(String value, Context context) throws Exception {
        System.out.println(LocalDateTime.now() + "发送数据:" + value);
        channel.basicPublish(rabbitMQSinkProperties.getExchange(), "", null, value.getBytes(StandardCharsets.UTF_8));
    }
}

输出测试

StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
env.setRuntimeMode(RuntimeExecutionMode.STREAMING);
//自定义数据源加载 
DataStreamSource<Location> source = env.addSource(new MyLocationSource());
//读取rabbitMq sink配置文件
String path = "RabbitMqSink.properties";
Props props = new Props(path);
RabbitMqSinkProperties sinkProperties = RabbitMqSinkProperties.builder()
    .host(props.getStr("sink.rabbitmq.host"))
    .port(props.getInt("sink.rabbitmq.port"))
    .userName(props.getStr("sink.rabbitmq.username"))
    .passWord(props.getStr("sink.rabbitmq.password"))
    .exchange(props.getStr("sink.rabbitmq.exchange"))
    .build();

// TODO  source进行数据处理 得到结果流
//将结果流用自定义的sink发送到rabbitmq
stream.addSink(new DemoSinkFunction(sinkProperties));

image-20210412204206021

image-20210412204224515

从控制台打印以及RabbitMQ-WEB页面看到,我们计算的结果,成功发送到了RabbitMQ的自定义交换机中, RabbitMQ SInk 示例完成!

标签:12,自定义,04,rabbitmq,2021,RabbitMQ,public,rabbitMQSinkProperties,sink
来源: https://blog.csdn.net/leilei1366615/article/details/115641027

本站声明: 1. iCode9 技术分享网(下文简称本站)提供的所有内容,仅供技术学习、探讨和分享;
2. 关于本站的所有留言、评论、转载及引用,纯属内容发起人的个人观点,与本站观点和立场无关;
3. 关于本站的所有言论和文字,纯属内容发起人的个人观点,与本站观点和立场无关;
4. 本站文章均是网友提供,不完全保证技术分享内容的完整性、准确性、时效性、风险性和版权归属;如您发现该文章侵犯了您的权益,可联系我们第一时间进行删除;
5. 本站为非盈利性的个人网站,所有内容不会用来进行牟利,也不会利用任何形式的广告来间接获益,纯粹是为了广大技术爱好者提供技术内容和技术思想的分享性交流网站。

专注分享技术,共同学习,共同进步。侵权联系[81616952@qq.com]

Copyright (C)ICode9.com, All Rights Reserved.

ICode9版权所有