ICode9

精准搜索请尝试: 精确搜索
首页 > 其他分享> 文章详细

springboot集成Hbase

2021-09-26 09:31:29  阅读:189  来源: 互联网

标签:集成 springboot tableName param IOException rowKey Hbase throws String


前提:hbase版本是1.2.6,详情看windows安装hbase_Fullmark0608-CSDN博客

1、添加maven

  <dependency>
            <groupId>org.apache.hbase</groupId>
            <artifactId>hbase-client</artifactId>
            <version>2.2.3</version>
            <exclusions>
                <exclusion>
                    <artifactId>slf4j-log4j12</artifactId>
                    <groupId>org.slf4j</groupId>
                </exclusion>
            </exclusions>
        </dependency>

2、添加配置

# hbase 数据库配置
hbase.config.hbase.zookeeper.quorum=127.0.0.1
hbase.config.hbase.zookeeper.property.clientPort=2181

3、编写代码

@ConfigurationProperties(prefix = "hbase")
public class HbaseProperties {
    private Map<String, String> config;

    public Map<String, String> getConfig() {
        return config;
    }

    public void setConfig(Map<String, String> config) {
        this.config = config;
    }
}
@EnableConfigurationProperties(HbaseProperties.class)
@org.springframework.context.annotation.Configuration
public class HbaseConfig {
    private final HbaseProperties prop;

    public HbaseConfig(HbaseProperties properties) {
        this.prop = properties;
    }

    @Bean
    public Configuration configuration() {
        Configuration configuration = HBaseConfiguration.create();
        Map<String, String> config = prop.getConfig();
        config.forEach(configuration::set);
        return configuration;
    }

    @Bean
    public Connection getConnection() throws IOException {
        return ConnectionFactory.createConnection(configuration());
    }
}
package com.catt.service;

import com.catt.config.HbaseConfig;
import com.catt.tools.system.JacksonUtil;
import lombok.extern.slf4j.Slf4j;
import org.apache.commons.lang3.StringUtils;
import org.apache.hadoop.hbase.Cell;
import org.apache.hadoop.hbase.CompareOperator;
import org.apache.hadoop.hbase.TableExistsException;
import org.apache.hadoop.hbase.TableName;
import org.apache.hadoop.hbase.client.*;
import org.apache.hadoop.hbase.filter.RowFilter;
import org.apache.hadoop.hbase.filter.SubstringComparator;
import org.apache.hadoop.hbase.util.Bytes;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.stereotype.Service;

import javax.annotation.PostConstruct;
import java.io.IOException;
import java.util.*;

/**
 * @author fanghuanbiao
 */
@Service
@Slf4j
public class HbaseService {
    @Autowired
    private HbaseConfig config;

    private static Connection connection = null;
    private static Admin admin = null;

    @PostConstruct
    private void init() {
        if (connection != null) {
            return;
        }
        try {
            connection = ConnectionFactory.createConnection(config.configuration());
            admin = connection.getAdmin();
        } catch (IOException e) {
            log.error("HBase create connection failed:", e);
        }
    }

    /**
     * create 'tableName','[Column Family 1]','[Column Family 2]'
     *
     * @param tableName      表名
     * @param columnFamilies 列族名
     * @throws IOException 异常
     */
    public void createTable(String tableName, String... columnFamilies) throws IOException {
        TableName name = TableName.valueOf(tableName);
        boolean isExists = this.tableExists(tableName);
        if (isExists) {
            throw new TableExistsException(tableName + "is exists!");
        }
        TableDescriptorBuilder descriptorBuilder = TableDescriptorBuilder.newBuilder(name);
        List<ColumnFamilyDescriptor> columnFamilyList = new ArrayList<>();
        for (String columnFamily : columnFamilies) {
            ColumnFamilyDescriptor columnFamilyDescriptor = ColumnFamilyDescriptorBuilder
                    .newBuilder(columnFamily.getBytes()).build();
            columnFamilyList.add(columnFamilyDescriptor);
        }
        descriptorBuilder.setColumnFamilies(columnFamilyList);
        TableDescriptor tableDescriptor = descriptorBuilder.build();
        admin.createTable(tableDescriptor);
    }

    /**
     * put <tableName>,<rowKey>,<family:column>,<value>,<timestamp>
     *
     * @param tableName    表名
     * @param rowKey       行主键
     * @param columnFamily 列族
     * @param column       列
     * @param value        值
     * @throws IOException
     */
    public void insertOrUpdate(String tableName, String rowKey, String columnFamily, String column, String value)
            throws IOException {
        this.insertOrUpdate(tableName, rowKey, columnFamily, new String[]{column}, new String[]{value});
    }

    /**
     * put <tableName>,<rowKey>,<family:column>,<value>,<timestamp>
     *
     * @param tableName    表名
     * @param rowKey       行主键
     * @param columnFamily 列族
     * @param columns      多个列
     * @param values       多个值
     * @throws IOException
     */
    public void insertOrUpdate(String tableName, String rowKey, String columnFamily, String[] columns, String[] values)
            throws IOException {
        Table table = connection.getTable(TableName.valueOf(tableName));
        Put put = new Put(Bytes.toBytes(rowKey));
        for (int i = 0; i < columns.length; i++) {
            put.addColumn(Bytes.toBytes(columnFamily), Bytes.toBytes(columns[i]), Bytes.toBytes(values[i]));
            table.put(put);
        }
    }

    /**
     * @param tableName 表名
     * @param rowKey    行主键
     * @throws IOException 异常
     */
    public void deleteRow(String tableName, String rowKey) throws IOException {
        Table table = connection.getTable(TableName.valueOf(tableName));
        Delete delete = new Delete(rowKey.getBytes());
        table.delete(delete);
    }

    /**
     * @param tableName    表名
     * @param rowKey       行主键
     * @param columnFamily 列族
     * @throws IOException 异常
     */
    public void deleteColumnFamily(String tableName, String rowKey, String columnFamily) throws IOException {
        Table table = connection.getTable(TableName.valueOf(tableName));
        Delete delete = new Delete(rowKey.getBytes());
        delete.addFamily(Bytes.toBytes(columnFamily));
        table.delete(delete);
    }

    /**
     * delete 'tableName','rowKey','columnFamily:column'
     *
     * @param tableName    表名
     * @param rowKey       行主键
     * @param columnFamily 列族
     * @param column       列
     * @throws IOException 异常
     */
    public void deleteColumn(String tableName, String rowKey, String columnFamily, String column) throws IOException {
        Table table = connection.getTable(TableName.valueOf(tableName));
        Delete delete = new Delete(rowKey.getBytes());
        delete.addColumn(Bytes.toBytes(columnFamily), Bytes.toBytes(column));
        table.delete(delete);
    }

    /**
     * disable 'tableName' 之后 drop 'tableName'
     *
     * @param tableName 表名
     * @throws IOException 异常
     */
    public void deleteTable(String tableName) throws IOException {
        boolean isExists = this.tableExists(tableName);
        if (!isExists) {
            return;
        }
        TableName name = TableName.valueOf(tableName);
        admin.disableTable(name);
        admin.deleteTable(name);
    }

    /**
     * get 'tableName','rowkey','family:column'
     *
     * @param tableName 表名
     * @param rowkey    行主键
     * @param family    列族
     * @param column    列
     * @return
     */
    public String getValue(String tableName, String rowkey, String family, String column) {
        Table table = null;
        String value = "";
        if (StringUtils.isBlank(tableName) || StringUtils.isBlank(family) || StringUtils.isBlank(rowkey) || StringUtils
                .isBlank(column)) {
            return null;
        }
        try {
            table = connection.getTable(TableName.valueOf(tableName));
            Get g = new Get(rowkey.getBytes());
            g.addColumn(family.getBytes(), column.getBytes());
            Result result = table.get(g);
            List<Cell> ceList = result.listCells();
            if (ceList != null && ceList.size() > 0) {
                for (Cell cell : ceList) {
                    value = Bytes.toString(cell.getValueArray(), cell.getValueOffset(), cell.getValueLength());
                }
            }
        } catch (IOException e) {
            e.printStackTrace();
        } finally {
            try {
                table.close();
            } catch (IOException e) {
                e.printStackTrace();
            }
        }
        return value;
    }

    /**
     * get 'tableName','rowKey'
     *
     * @param tableName 表名
     * @param rowKey    行主键
     * @return 行值
     * @throws IOException 异常
     */
    public String selectOneRow(String tableName, String rowKey) throws IOException {
        Table table = connection.getTable(TableName.valueOf(tableName));
        Get get = new Get(rowKey.getBytes());
        Result result = table.get(get);
        NavigableMap<byte[], NavigableMap<byte[], NavigableMap<Long, byte[]>>> map = result.getMap();
        for (Cell cell : result.rawCells()) {
            String row = Bytes.toString(cell.getRowArray());
            String columnFamily = Bytes.toString(cell.getFamilyArray());
            String column = Bytes.toString(cell.getQualifierArray());
            String value = Bytes.toString(cell.getValueArray());
            // 可以通过反射封装成对象(列名和Java属性保持一致)
            System.out.println(row);
            System.out.println(columnFamily);
            System.out.println(column);
            System.out.println(value);
        }
        return null;
    }

    /**
     * scan 't1',{FILTER=>"PrefixFilter('2015')"}
     *
     * @param tableName    表名
     * @param rowKeyFilter 行主键前缀
     * @return 主键前缀的所有行
     * @throws IOException 异常
     */
    public String scanTable(String tableName, String rowKeyFilter) throws IOException {
        Table table = connection.getTable(TableName.valueOf(tableName));
        Scan scan = new Scan();
        if (!StringUtils.isEmpty(rowKeyFilter)) {
            RowFilter rowFilter = new RowFilter(CompareOperator.EQUAL, new SubstringComparator(rowKeyFilter));
            scan.setFilter(rowFilter);
        }
        ResultScanner scanner = table.getScanner(scan);
        try {
            for (Result result : scanner) {
                System.out.println(Bytes.toString(result.getRow()));
                for (Cell cell : result.rawCells()) {
                    System.out.println(cell);
                }
            }
        } finally {
            if (scanner != null) {
                scanner.close();
            }
        }
        return null;
    }


    /**
     * 判断表是否已经存在,这里使用间接的方式来实现
     *
     * @param tableName 表名
     * @return 真or假
     * @throws IOException 异常
     */
    public boolean tableExists(String tableName) throws IOException {
        TableName[] tableNames = admin.listTableNames();
        if (tableNames != null && tableNames.length > 0) {
            for (int i = 0; i < tableNames.length; i++) {
                if (tableName.equals(tableNames[i].getNameAsString())) {
                    return true;
                }
            }
        }
        return false;
    }
}

4、测试

示例:Student 数据表
行键列族 StuInfo列族 Grades时间戳
NameAgeSexClassBigDataComputerMath
0001Tom Green18Male809085T2
0002Amy19019589T1
0003Allen19Male029088T1
public static void main(String[] args) {
        ConfigurableApplicationContext context = new SpringApplicationBuilder(HbaseFzpGxydApplication.class)
                .run(args);
        HbaseService hbaseService = (HbaseService) context.getBean("hbaseService");
        try {
            // hbaseService.createTable("Student", "StuInfo", "Grades");
            // hbaseService.insertOrUpdate("Student", "0001", "StuInfo", "name", "Tom Green");
            // hbaseService.insertOrUpdate("Student", "0001", "StuInfo", "Age", "18");
            System.out.println("=================" + hbaseService.getValue("Student", "0001", "StuInfo", "name"));
            System.out.println("=================" + hbaseService.getValue("Student", "0001", "StuInfo", "Age"));
            System.out.println("=================" + hbaseService.selectOneRow("Student", "0001"));
            System.out.println("=================" + hbaseService.selectOneRow("Student", "0002"));

        } catch (Exception e) {
            e.printStackTrace();
        }
        context.close();
    }

标签:集成,springboot,tableName,param,IOException,rowKey,Hbase,throws,String
来源: https://blog.csdn.net/qq_22165667/article/details/120460203

本站声明: 1. iCode9 技术分享网(下文简称本站)提供的所有内容,仅供技术学习、探讨和分享;
2. 关于本站的所有留言、评论、转载及引用,纯属内容发起人的个人观点,与本站观点和立场无关;
3. 关于本站的所有言论和文字,纯属内容发起人的个人观点,与本站观点和立场无关;
4. 本站文章均是网友提供,不完全保证技术分享内容的完整性、准确性、时效性、风险性和版权归属;如您发现该文章侵犯了您的权益,可联系我们第一时间进行删除;
5. 本站为非盈利性的个人网站,所有内容不会用来进行牟利,也不会利用任何形式的广告来间接获益,纯粹是为了广大技术爱好者提供技术内容和技术思想的分享性交流网站。

专注分享技术,共同学习,共同进步。侵权联系[81616952@qq.com]

Copyright (C)ICode9.com, All Rights Reserved.

ICode9版权所有