ICode9

精准搜索请尝试: 精确搜索
首页 > 其他分享> 文章详细

Flink流处理-Sink之HBase

2021-11-08 01:01:29  阅读:264  来源: 互联网

标签:String Flink Bytes cf Sink toBytes put HBase addColumn


TripDriveToHBaseSink

package pers.aishuang.flink.streaming.sink.hbase;

import org.apache.flink.api.java.utils.ParameterTool;
import org.apache.flink.configuration.Configuration;
import org.apache.flink.streaming.api.functions.sink.RichSinkFunction;
import org.apache.hadoop.hbase.HBaseConfiguration;
import org.apache.hadoop.hbase.HConstants;
import org.apache.hadoop.hbase.TableName;
import org.apache.hadoop.hbase.client.*;
import org.apache.hadoop.hbase.mapreduce.TableInputFormat;
import org.apache.hadoop.hbase.util.Bytes;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import pers.aishuang.flink.streaming.entity.TripModel;
import pers.aishuang.flink.streaming.utils.DateUtil;

import java.io.IOException;

public class TripDriveToHBaseSink extends RichSinkFunction<TripModel> {
    private final static Logger logger = LoggerFactory.getLogger(TripDriveToHBaseSink.class);


    private String tableName;
    private Connection conn = null;
    private BufferedMutator mutator = null;

    public TripDriveToHBaseSink(String _tableName) {
        this.tableName = _tableName;
    }

    @Override
    public void open(Configuration parameters) throws Exception {
        //从上下文获取到全局参数
        ParameterTool globalJobParameters = (ParameterTool) getRuntimeContext()
                .getExecutionConfig().getGlobalJobParameters();
        //获取HBase Java API相关参数
        String zkQuorum = globalJobParameters.getRequired("zookeeper.quorum");
        String port = globalJobParameters.getRequired("zookeeper.clientPort");
        org.apache.hadoop.conf.Configuration conf = new org.apache.hadoop.conf.Configuration();
        conf.set(HConstants.ZOOKEEPER_QUORUM,zkQuorum);
        conf.set(HConstants.ZOOKEEPER_CLIENT_PORT,port);
        conf.set(TableInputFormat.INPUT_TABLE,tableName);

        org.apache.hadoop.conf.Configuration hbaseConf = HBaseConfiguration.create(conf);
        //通过连接工厂创建连接
        conn = ConnectionFactory.createConnection(hbaseConf);
        //设置缓存对象的多大、多长时间刷写到HBase中
        //缓存写入HBaes,与Kafka的缓存写入Kafka有异曲同工之秒
        BufferedMutatorParams params = new BufferedMutatorParams(TableName.valueOf(tableName));
        //设置缓存达到一定的大小:10M
        params.writeBufferSize(10*1024*1024L);
        //设置缓存达到一定的时间:5s
        params.setWriteBufferPeriodicFlushTimeoutMs(5*1000L);
        //通过连接获取表对象
        try {
            mutator = conn.getBufferedMutator(params);
        } catch (IOException e) {
            logger.error("当前获取bufferedMutator 失败:" + e.getMessage());
        }
    }

    //5. 重写 invoke 方法,将读取的数据写入到 hbase
    @Override
    public void invoke(TripModel value, Context context) throws Exception {
        //5.1 setDataSourcePut输入参数value,返回put对象
        try {
            Put put = setDataSourcePut(value);
            mutator.mutate(put);
            //5.2 指定时间内的数据强制刷写到hbase
            mutator.flush();
        } catch (Exception ex) {
            logger.error("写入到hbase失败:" + ex.getMessage());
        }
    }

    //4.重写close方法
    @Override
    public void close() throws Exception {
        //4.1 关闭hbase 表和连接资源
        if (mutator != null) mutator.close();
        if (!conn.isClosed()) conn.close();
    }

    //6. 实现 setDataSourcePut 方法

    /**
     * 每条对象生成一个 put
     * 1.表名 2.rowkey 3.列簇  4.列名和列值
     *
     * @param tripModel
     * @return
     */
    private Put setDataSourcePut(TripModel tripModel) {
        String rowKey = tripModel.getVin() + "_" + DateUtil.convertStringToDate(tripModel.getTripStartTime()).getTime();
        String cf = "cf";
        Put put = new Put(Bytes.toBytes(rowKey));
        put.addColumn(Bytes.toBytes(cf), Bytes.toBytes("vin"), Bytes.toBytes(tripModel.getVin()));
        put.addColumn(Bytes.toBytes(cf), Bytes.toBytes("lastSoc"), Bytes.toBytes(String.valueOf(tripModel.getLastSoc())));
        put.addColumn(Bytes.toBytes(cf), Bytes.toBytes("lastMileage"), Bytes.toBytes(String.valueOf(tripModel.getLastMileage())));

        put.addColumn(Bytes.toBytes(cf), Bytes.toBytes("tripStartTime"), Bytes.toBytes(tripModel.getTripStartTime()));
        put.addColumn(Bytes.toBytes(cf), Bytes.toBytes("start_BMS_SOC"), Bytes.toBytes(String.valueOf(tripModel.getStart_BMS_SOC())));
        put.addColumn(Bytes.toBytes(cf), Bytes.toBytes("start_longitude"), Bytes.toBytes(String.valueOf(tripModel.getStart_longitude())));
        put.addColumn(Bytes.toBytes(cf), Bytes.toBytes("start_latitude"), Bytes.toBytes(String.valueOf(tripModel.getStart_latitude())));
        put.addColumn(Bytes.toBytes(cf), Bytes.toBytes("start_mileage"), Bytes.toBytes(String.valueOf(tripModel.getStart_mileage())));

        put.addColumn(Bytes.toBytes(cf), Bytes.toBytes("end_BMS_SOC"), Bytes.toBytes(String.valueOf(tripModel.getEnd_BMS_SOC())));
        put.addColumn(Bytes.toBytes(cf), Bytes.toBytes("end_longitude"), Bytes.toBytes(String.valueOf(tripModel.getEnd_longitude())));
        put.addColumn(Bytes.toBytes(cf), Bytes.toBytes("end_latitude"), Bytes.toBytes(String.valueOf(tripModel.getEnd_latitude())));
        put.addColumn(Bytes.toBytes(cf), Bytes.toBytes("end_mileage"), Bytes.toBytes(String.valueOf(tripModel.getEnd_mileage())));
        put.addColumn(Bytes.toBytes(cf), Bytes.toBytes("tripEndTime"), Bytes.toBytes(tripModel.getTripEndTime()));

        put.addColumn(Bytes.toBytes(cf), Bytes.toBytes("mileage"), Bytes.toBytes(String.valueOf(tripModel.getMileage())));
        put.addColumn(Bytes.toBytes(cf), Bytes.toBytes("max_speed"), Bytes.toBytes(String.valueOf(tripModel.getMax_speed())));
        put.addColumn(Bytes.toBytes(cf), Bytes.toBytes("soc_comsuption"), Bytes.toBytes(String.valueOf(tripModel.getSoc_comsuption())));
        put.addColumn(Bytes.toBytes(cf), Bytes.toBytes("time_comsuption"), Bytes.toBytes(String.valueOf(tripModel.getTime_comsuption())));

        put.addColumn(Bytes.toBytes(cf), Bytes.toBytes("total_low_speed_nums"), Bytes.toBytes(String.valueOf(tripModel.getTotal_low_speed_nums())));
        put.addColumn(Bytes.toBytes(cf), Bytes.toBytes("total_medium_speed_nums"), Bytes.toBytes(String.valueOf(tripModel.getTotal_medium_speed_nums())));
        put.addColumn(Bytes.toBytes(cf), Bytes.toBytes("total_high_speed_nums"), Bytes.toBytes(String.valueOf(tripModel.getTotal_high_speed_nums())));

        put.addColumn(Bytes.toBytes(cf), Bytes.toBytes("Low_BMS_SOC"), Bytes.toBytes(String.valueOf(tripModel.getLow_BMS_SOC())));
        put.addColumn(Bytes.toBytes(cf), Bytes.toBytes("Medium_BMS_SOC"), Bytes.toBytes(String.valueOf(tripModel.getMedium_BMS_SOC())));
        put.addColumn(Bytes.toBytes(cf), Bytes.toBytes("High_BMS_SOC"), Bytes.toBytes(String.valueOf(tripModel.getHigh_BMS_SOC())));
        put.addColumn(Bytes.toBytes(cf), Bytes.toBytes("Low_BMS_Mileage"), Bytes.toBytes(String.valueOf(tripModel.getLow_BMS_Mileage())));
        put.addColumn(Bytes.toBytes(cf), Bytes.toBytes("Medium_BMS_Mileage"), Bytes.toBytes(String.valueOf(tripModel.getMedium_BMS_Mileage())));
        put.addColumn(Bytes.toBytes(cf), Bytes.toBytes("High_BMS_Mileage"), Bytes.toBytes(String.valueOf(tripModel.getHigh_BMS_Mileage())));

        put.addColumn(Bytes.toBytes(cf), Bytes.toBytes("tripStatus"), Bytes.toBytes(String.valueOf(tripModel.getTripStatus())));
        put.addColumn(Bytes.toBytes(cf), Bytes.toBytes("processTime"), Bytes.toBytes(DateUtil.getCurrentDateTime()));
        return put;
    }
}

TripSampleToHBaseSink

package pers.aishuang.flink.streaming.sink.hbase;

import org.apache.flink.api.java.utils.ParameterTool;
import org.apache.flink.configuration.Configuration;
import org.apache.flink.streaming.api.functions.sink.RichSinkFunction;
import org.apache.hadoop.hbase.HBaseConfiguration;
import org.apache.hadoop.hbase.HConstants;
import org.apache.hadoop.hbase.TableName;
import org.apache.hadoop.hbase.client.BufferedMutator;
import org.apache.hadoop.hbase.client.BufferedMutatorParams;
import org.apache.hadoop.hbase.client.ConnectionFactory;
import org.apache.hadoop.hbase.client.Put;
import org.apache.hadoop.hbase.mapreduce.TableInputFormat;
import org.apache.hadoop.hbase.util.Bytes;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import pers.aishuang.flink.streaming.utils.DateUtil;
import pers.aishuang.flink.streaming.utils.StringUtil;

import java.io.IOException;

public class TripSampleToHBaseSink extends RichSinkFunction<String[]> {
    //创建日志打印器
    private final static Logger logger = LoggerFactory.getLogger(TripSampleToHBaseSink.class);

    //定义当前类的私有变量
    private String tableName;
    //定义连接
    org.apache.hadoop.hbase.client.Connection conn = null;
    //定义表操作的对象
    BufferedMutator mutator = null;

    //创建一个有参数-表名的构造方法
    public TripSampleToHBaseSink(String _tableName) {
        this.tableName = _tableName;
    }

    //重写open方法
    @Override
    public void open(Configuration parameters) throws Exception {
        //1、从上下文获取到全局的参数
        ParameterTool globalJobParameters = (ParameterTool) getRuntimeContext()
                .getExecutionConfig()
                .getGlobalJobParameters();
        //2、获取HBase Java API相关参数
        //-- 指定ZK集群服务端地址(quorum:法定人数)
        String zkQuorum = globalJobParameters.getRequired("zookeeper.quorum");
        //-- 指定ZK客户端端口号
        String port = globalJobParameters.getRequired("zookeeper.clientPort");
        //-- 创建配置
        org.apache.hadoop.conf.Configuration conf = new org.apache.hadoop.conf.Configuration();
        //-- 设置配置,加载参数
        conf.set(HConstants.CLIENT_ZOOKEEPER_QUORUM,zkQuorum);
        conf.set(HConstants.ZOOKEEPER_CLIENT_PORT,port);
        conf.set(TableInputFormat.INPUT_TABLE,tableName);

        org.apache.hadoop.conf.Configuration hbaseConf = HBaseConfiguration.create(conf);
        //3、通过连接工厂创建连接
        conn = ConnectionFactory.createConnection(hbaseConf);
        //-- 设置缓存对象的多大、多长时间刷新到Hbase中
        BufferedMutatorParams params = new BufferedMutatorParams(TableName.valueOf(tableName));
        //-- 写缓存大小为10M
        params.writeBufferSize(10*1024*1024L);//10M
        //-- 写缓存刷写时间为5s
        params.setWriteBufferPeriodicFlushTimeoutMs(5*1000L);//5s
        //4、通过连接获取表对象
        try {
            mutator = conn.getBufferedMutator(params);
        } catch (IOException e) {
            logger.error("当前获取bufferedMutator 失败:" + e.getMessage());
        }
    }

    //5、重写invoke方法,将读取的数据写入到HBase
    @Override
    public void invoke(String[] value, Context context) throws Exception {
        //-- setDataSourcePut输入参数value,返回put对象
        try {
            Put put = setDataSourcePut(value);
            mutator.mutate(put);
            //-- 指定时间内的数据强制刷写到HBase
            mutator.flush();
        } catch (IOException e) {
            logger.error("写入到HBase失败:" + e.getMessage());
        }
    }

    //重写close方法
    @Override
    public void close() throws Exception {
        //关闭hbase表和连接资源
        if(mutator != null) mutator.close();
        if( conn != null ) conn.close();
    }


    /**
     * 实现setDataSourcePut方法
     * 每个对象生成一个 put
     * 1、表名 2、rowkey 3、列簇 4、列别和列值
     * @param tripDriveArr
     * @return
     */
    private Put setDataSourcePut(String[] tripDriveArr) {
        //1. 如何设计rowkey VIN+时间戳反转
        String rowkey = tripDriveArr[0] + StringUtil.reverse(tripDriveArr[1]);
        //2. 通过rowkey实例化put
        Put put = new Put(Bytes.toBytes(rowkey));
        //3. 定义列簇的名称
        String cf = "cf";
        put.addColumn(Bytes.toBytes(cf),Bytes.toBytes("vin"),Bytes.toBytes(tripDriveArr[0]));
        put.addColumn(Bytes.toBytes(cf),Bytes.toBytes("terminalTimeStamp"),Bytes.toBytes(tripDriveArr[1]));
        put.addColumn(Bytes.toBytes(cf),Bytes.toBytes("soc"),Bytes.toBytes(tripDriveArr[2]));
        put.addColumn(Bytes.toBytes(cf),Bytes.toBytes("mileage"),Bytes.toBytes(tripDriveArr[3]));
        put.addColumn(Bytes.toBytes(cf),Bytes.toBytes("speed"),Bytes.toBytes(tripDriveArr[4]));
        put.addColumn(Bytes.toBytes(cf),Bytes.toBytes("gps"),Bytes.toBytes(tripDriveArr[5]));
        put.addColumn(Bytes.toBytes(cf),Bytes.toBytes("terminalTime"),Bytes.toBytes(tripDriveArr[6]));
        put.addColumn(Bytes.toBytes(cf),Bytes.toBytes("processTime"),Bytes.toBytes(DateUtil.getCurrentDateTime()));

        return put;
    }
}

标签:String,Flink,Bytes,cf,Sink,toBytes,put,HBase,addColumn
来源: https://www.cnblogs.com/zi-shuo/p/15522500.html

本站声明: 1. iCode9 技术分享网(下文简称本站)提供的所有内容,仅供技术学习、探讨和分享;
2. 关于本站的所有留言、评论、转载及引用,纯属内容发起人的个人观点,与本站观点和立场无关;
3. 关于本站的所有言论和文字,纯属内容发起人的个人观点,与本站观点和立场无关;
4. 本站文章均是网友提供,不完全保证技术分享内容的完整性、准确性、时效性、风险性和版权归属;如您发现该文章侵犯了您的权益,可联系我们第一时间进行删除;
5. 本站为非盈利性的个人网站,所有内容不会用来进行牟利,也不会利用任何形式的广告来间接获益,纯粹是为了广大技术爱好者提供技术内容和技术思想的分享性交流网站。

专注分享技术,共同学习,共同进步。侵权联系[81616952@qq.com]

Copyright (C)ICode9.com, All Rights Reserved.

ICode9版权所有