ICode9

精准搜索请尝试: 精确搜索
首页 > 其他分享> 文章详细

HBase实践:HBase2.x协处理器同步数据到数据仓库(那些你不知道的坑)

2021-07-05 23:59:48  阅读:247  来源: 互联网

标签:String HBase2 数据库 数据仓库 Server Coprocessor HBase 协处理器


目录

前言

一. 什么是协处理器

Observer Coprocessor

Endpoint Coprocessor

二.开发案列

总结

前言

HBase是基于Hadoop存储的一种超大型KV数据库,从字面意思可以看出HBase对KV结构支持比较友好,虽然现在还支持Phoenix查询,但是对于很多应用场景中,我们需要HBase廉价的存储和支持超大高并发查询的性能,但是我们不仅仅是想通过rowkey来获取对应的数据,还可能通过其他字段获取,什么我想像普通数据库一样求和,统计数量都难以达到,这种情况又要如何处理呢?

HBase自带的协处理器(Coprocessor)功能,很多公司对HBase进行了深度开发,利用Coprocessor实现了像标准数据库一样的操作,可以为我们提供一些HBase本身无法实现的功能。

一. 什么是协处理器

简单来说,Coprocessor是一个框架,这个框架可以让你很容易地在Region Server运行你的业务逻辑代码。

Coprocessor主要分为两种类型:Observer Coprocessor和EndPoint Coprocessor,如果和RDBMS做类比的话:

  • Observer Coprocessor –> RDBMS 中的触发器
  • EndPoint Coprocessor –> RDBMS中的存储过程

Observer Coprocessor

类似于传统数据库中的触发器,当发生某些事件的时候这类协处理器会被 Server 端调用。Observer Coprocessor 就是一些散布在 HBase Server 端代码中的 hook 钩子, 在固定的事件发生时被调用。比如:put 操作之前有钩子函数 prePut,该函数在 put 操作执 行前会被 Region Server 调用;在 put 操作之后则有 postPut 钩子函数。

Endpoint Coprocessor

类似传统数据库中的存储过程,客户端可以调用这些 Endpoint 协处 理器执行一段 Server 端代码,并将 Server 端代码的结果返回给客户端进一步处理,最常见 的用法就是进行聚集操作。如果没有协处理器,当用户需要找出一张表中的最大数据,即 max 聚合操作,就必须进行全表扫描,在客户端代码内遍历扫描结果,并执行求最大值的 操作。这样的方法无法利用底层集群的并发能力,而将所有计算都集中到 Client 端统一执行, 势必效率低下。利用 Coprocessor,用户可以将求最大值的代码部署到 HBase Server 端,HBase 将利用底层 cluster 的多个节点并发执行求最大值的操作。即在每个 Region 范围内执行求最 大值的代码,将每个 Region 的最大值在 Region Server 端计算出,仅仅将该 max 值返回给客 户端。在客户端进一步将多个 Region 的最大值进一步处理而找到其中的最大值。这样整体 的执行效率就会提高很多

二.开发案列

应用场景:

Mysql的维度表Binlog日志实时同步到HBase后,当维度表更新时想要第一时间获知,且想要同步到数仓表中该怎么办,这里利用了HBase Observer Coprocessor获取binlog的Update和Delete行为数据,在即将Put前同步到数仓中。

Maven:

<dependency>
            <groupId>org.apache.hbase</groupId>
            <artifactId>hbase-server</artifactId>
            <version>2.1.9</version>
        </dependency>

        <!-- https://mvnrepository.com/artifact/org.apache.hbase/hbase-client -->
        <dependency>
            <groupId>org.apache.hbase</groupId>
            <artifactId>hbase-client</artifactId>
            <version>2.1.9</version>
        </dependency>

        <dependency>
            <groupId>org.postgresql</groupId>
            <artifactId>postgresql</artifactId>
            <version>42.2.18</version>
        </dependency>

 开发代码:

这里使用的是HBase2.1.9版本,HBase2.x版本的API会有很大的变化,需要继承

RegionObserver, RegionCoprocessor两个接口,重写getRegionObserver,start,stop,prePut,preDelete方法.
public class GreenplumObserver implements RegionObserver, RegionCoprocessor {

    @Override
    public Optional<RegionObserver> getRegionObserver() {
        return Optional.of(this);
    }

    /**
     * 初始化Greenplum连接
     * @param env
     * @throws IOException
     */
    @Override
    public void start(CoprocessorEnvironment env) throws IOException {
        try {
            Class.forName("org.postgresql.Driver");
            connection = DriverManager.getConnection(GREENPLUM_URL, GREENPLUM_USER, GREENPLUM_PASSWORD);
            logger.info("初始化连接Greenplum...." + connection);
        } catch (SQLException | ClassNotFoundException throwables) {
            throwables.printStackTrace();
        }
        threadPoolExecutor = ThreadPoolUtil.getInstance();
    }

    @Override
    public void prePut(ObserverContext<RegionCoprocessorEnvironment> c, Put put, WALEdit edit, Durability durability) throws IOException {
        threadPoolExecutor.submit(new Runnable() {
            @Override
            public void run() {
                Map<byte[], List<Cell>> familyCellMap = put.getFamilyCellMap();
                HashMap<String, String> resultMap = new HashMap<>();
                //TODO 获取sql语句中的查询条件和更新条件
                for (Map.Entry<byte[], List<Cell>> entry : familyCellMap.entrySet()) {
                    for (Cell cell : entry.getValue()) {
                        String key = Bytes.toString(CellUtil.cloneQualifier(cell));
                        String value = Bytes.toString(CellUtil.cloneValue(cell));
                        if (arrayList.contains(key)) {
                            resultMap.put(key, value);
                        }
                    }
                }
                String queryCondition = resultMap.getOrDefault("query_conditions", null);
                if (!Strings.isNullOrEmpty(sinkTable)) {
                    String gpTables = hashMap.getOrDefault(sinkTable, null);
                    String[] split = gpTables.split("\\|", -1);
                    for (String table : split) {
                        if ("3".equals(operationAction)) {
                            String updatesql = "UPDATE dwd." + table + " SET " + updateInfo + " WHERE " + queryCondition;
                            logger.info("执行语句: " + updatesql);
                            GreenplumBulkOperator.addUpdateAndDeleteToDB(connection, updatesql);
                        } else if ("2".equals(operationAction)) {
                            String deletesql = "DELETE FROM dwd." + table + " WHERE " + queryCondition;
                            logger.info("执行语句: " + deletesql);
                            GreenplumBulkOperator.addUpdateAndDeleteToDB(connection, deletesql);
                        }
                    }
                }
            }
        });
    }

    @Override
    public void preDelete(ObserverContext<RegionCoprocessorEnvironment> c, Delete delete, WALEdit edit, Durability durability) throws IOException {
        logger.info("============= PreDelete ===================");
    }

    /**
     * 关闭Greenplum连接
     * @param env
     * @throws IOException
     */
    @Override
    public void stop(CoprocessorEnvironment env) throws IOException {
        logger.info("==================== END ==================");
        if (connection != null) {
            try {
                connection.close();
            } catch (SQLException throwables) {
                throwables.printStackTrace();
            }
        }
    }
}

总结

总而言之,利用HBase 协处理器的确帮我们解决了很多问题,比如上面的更新删除维度表或者现在很多利用协处理器来做二级索引,开发思路大致一样。

但是同时也面临着以下几点问题,这些都是我在HBase开发使用中遇到的问题:

  • 利用HBase存储+二级索引架构适用于超大数据量存储(十亿百亿级别),因为前端请求查询时会通过索引数据库映射得到rowkey再去查询HBase,这种情况势必会影响到查询性能。如果数据量不大,不建议这么使用,可以直接用Phoenix或者换一种数仓OLAP数据库,但是如果数据量达到数十亿上百亿这种规模,这种架构还是很香的。毕竟大部分数据库要在这个数量级使用相同服务器配置的话,HBase的优势立马体现了。
  • 因为同步到数据库其实是一种双写的操作,在双写的过程中无法完全保证数据一致性,这里要靠自己的开发手段去弥补。
  • 协处理器无法冗余历史数据,如果想重新计算,只能用过触发器来重新触发。

关注我,不迷路,带你们玩引擎,玩数据库,玩源码~

标签:String,HBase2,数据库,数据仓库,Server,Coprocessor,HBase,协处理器
来源: https://blog.csdn.net/BlackArmand/article/details/118500790

本站声明: 1. iCode9 技术分享网(下文简称本站)提供的所有内容,仅供技术学习、探讨和分享;
2. 关于本站的所有留言、评论、转载及引用,纯属内容发起人的个人观点,与本站观点和立场无关;
3. 关于本站的所有言论和文字,纯属内容发起人的个人观点,与本站观点和立场无关;
4. 本站文章均是网友提供,不完全保证技术分享内容的完整性、准确性、时效性、风险性和版权归属;如您发现该文章侵犯了您的权益,可联系我们第一时间进行删除;
5. 本站为非盈利性的个人网站,所有内容不会用来进行牟利,也不会利用任何形式的广告来间接获益,纯粹是为了广大技术爱好者提供技术内容和技术思想的分享性交流网站。

专注分享技术,共同学习,共同进步。侵权联系[81616952@qq.com]

Copyright (C)ICode9.com, All Rights Reserved.

ICode9版权所有