ICode9

精准搜索请尝试: 精确搜索
首页 > 其他分享> 文章详细

HBase的API使用

2021-05-04 17:33:55  阅读:217  来源: 互联网

标签:hbase org hadoop t1 API 使用 import HBase getBytes


目录

DDL

import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.hbase.HBaseConfiguration;
import org.apache.hadoop.hbase.NamespaceDescriptor;
import org.apache.hadoop.hbase.TableName;
import org.apache.hadoop.hbase.client.Admin;
import org.apache.hadoop.hbase.client.ColumnFamilyDescriptor;
import org.apache.hadoop.hbase.client.ColumnFamilyDescriptorBuilder;
import org.apache.hadoop.hbase.client.Connection;
import org.apache.hadoop.hbase.client.ConnectionFactory;
import org.apache.hadoop.hbase.client.TableDescriptor;
import org.apache.hadoop.hbase.client.TableDescriptorBuilder;
import org.apache.hadoop.hbase.regionserver.BloomType;
import org.junit.Before;
import org.junit.Test;

public class DDLDemo {
	
	Connection conn = null;
	
	@Before
	public void init() throws Exception {
		
		// Configuration conf = new Configuration();会自动加载classpath中的core-site.xml,core-default.xml,
		// hdfs-site.xml,hdfs-default.xml,yarn-site.xml,yarn-default.xml,mpred-site.xml,mpred-default.xml
		Configuration conf = HBaseConfiguration.create();// 除了hadoop的配置,还会加载hbase-site.xml
		
		// 客户端连接不需要指定具体的master或regionserver地址,只需要指定zookeeper地址就行
		conf.set("hbase.zookeeper.quorum", "linux01:2181,linux02:2181,linux03:2181");
		
		conn = ConnectionFactory.createConnection(conf);
		
	}
	
	// 创建名称空间
	@Test
	public void testCreateNameSpace() throws Exception {
		
		// 获取表定义管理器
		Admin admin = conn.getAdmin();
		
		NamespaceDescriptor myspace = NamespaceDescriptor.create("myspace").build();
		admin.createNamespace(myspace);
		
		admin.close();
		conn.close();
	}
	
	// 创建表
	@Test
	public void testCreateTable() throws Exception {
		
		Admin admin = conn.getAdmin();
		
		// 构建一个表定义描述对象构建器
//		TableDescriptorBuilder tbBuilder = TableDescriptorBuilder.newBuilder(TableName.valueOf("myspace", "t1"));
		TableDescriptorBuilder tbBuilder = TableDescriptorBuilder.newBuilder(TableName.valueOf("myspace", "t1"));
		
		// 构建一个列族描述对象构造器
		ColumnFamilyDescriptorBuilder cfBuilder = ColumnFamilyDescriptorBuilder.newBuilder("f1".getBytes());
		// 为列族定义设置参数
		cfBuilder.setBloomFilterType(BloomType.ROWCOL);
		cfBuilder.setTimeToLive(6000);
		cfBuilder.setMaxVersions(3);
		// 获取列族描述对象
		ColumnFamilyDescriptor f1 = cfBuilder.build();
		
		// 用表构建器设置列族,并构建表描述对象
		TableDescriptor t1 = tbBuilder.setColumnFamily(f1).build();
		
		// 用表定义管理器创建表
		admin.createTable(t1);
		
		admin.close();
		conn.close();
	}
	
	// 创建预分区表
	@Test
	public void testCreateTableSplit() throws Exception {
		
		Admin admin = conn.getAdmin();
		
		TableDescriptorBuilder tbBuilder = TableDescriptorBuilder.newBuilder(TableName.valueOf("myspace:t1"));
		
		ColumnFamilyDescriptor f1 = ColumnFamilyDescriptorBuilder.newBuilder("f1".getBytes()).build();
		
		TableDescriptor t1 = tbBuilder.setColumnFamily(f1).build();
		
		byte[][] splitKeys = {"r005".getBytes(), "r008".getBytes()};
		
		// 指定预分区的分界点
		admin.createTable(t1, splitKeys);
		
		admin.close();
		conn.close();
	}
	
	// 修改表定义
	@Test
	public void modifyTableDescription() throws Exception {
		
		Admin admin = conn.getAdmin();		
		
		// 1. 修改之前的列族定义
		// 获得表定义
		TableDescriptor t1 = admin.getDescriptor(TableName.valueOf("myspace:t1"));
		// 从表定义中取出列族f1的定义
		ColumnFamilyDescriptor f1 = t1.getColumnFamily("f1".getBytes());
		// 用列族定义构建起对原来的列族f1定义进行修改
		f1 = ColumnFamilyDescriptorBuilder.newBuilder(f1).setTimeToLive(Integer.MAX_VALUE).build();
		
		// 修改指定表的列族定义
		admin.modifyColumnFamily(TableName.valueOf("myspace:t1"), f1);
		
		// 2. 增加一个新的列族
		// 构建一个新的列族定义
		ColumnFamilyDescriptor f2 = ColumnFamilyDescriptorBuilder.newBuilder("f2".getBytes()).build();
		// 表定义修改过,再取一次
		t1 = admin.getDescriptor(TableName.valueOf("myspace:t1"));
		// 对原来的表定义设置新的列族
		TableDescriptorBuilder tbBuilder = TableDescriptorBuilder.newBuilder(t1);
		tbBuilder.setColumnFamily(f2);
		t1 = tbBuilder.build();
		
		// 通过客户端发送定义修改命令
		admin.modifyTable(t1);
		
		admin.close();
		conn.close();
	}
	
	// 删除列族/表/名称空间
	@Test
	public void delete() throws Exception {
		
		Admin admin = conn.getAdmin();
		
//		// 删除列族
//		admin.deleteColumnFamily(TableName.valueOf("myspace:t1"), "f1".getBytes());
//		
//		// 禁用一个表
//		admin.disableTable(TableName.valueOf("myspace:t1"));
//		
//		// 删除表
//		admin.deleteTable(TableName.valueOf("myspace:t1"));
		
		// 删除名称空间,必须先把其中的表全部删掉
		admin.deleteNamespace("myspace");
		admin.close();
		conn.close();
	}
	
	// 列出所有名称空间/表
	@Test
	public void testList() throws Exception {
		
		Admin admin = conn.getAdmin();
		
		// 列出名称空间
		NamespaceDescriptor[] namespaceDescriptors = admin.listNamespaceDescriptors();
		for (NamespaceDescriptor namespaceDescriptor : namespaceDescriptors) {
			System.out.println(namespaceDescriptor.getName());
		}
		
		// 列出表
		TableName[] tableNames = admin.listTableNames();
		for (TableName tableName : tableNames) {
			System.out.println(tableName.getNameAsString());
		}
		
		admin.close();
		conn.close();
	}
}

DML

import java.util.ArrayList;

import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.hbase.HBaseConfiguration;
import org.apache.hadoop.hbase.TableName;
import org.apache.hadoop.hbase.client.Admin;
import org.apache.hadoop.hbase.client.BufferedMutator;
import org.apache.hadoop.hbase.client.Connection;
import org.apache.hadoop.hbase.client.ConnectionFactory;
import org.apache.hadoop.hbase.client.Delete;
import org.apache.hadoop.hbase.client.Put;
import org.apache.hadoop.hbase.client.Table;
import org.apache.hadoop.hbase.util.Bytes;
import org.junit.Before;
import org.junit.Test;

public class DMLDemo {
	
	Connection conn = null;
	
	@Before
	public void init() throws Exception {
		
		Configuration conf = HBaseConfiguration.create();
		conf.set("hbase.zookeeper.quorum", "linux01:2181,linux02:2181,linux03:2181");
		conn = ConnectionFactory.createConnection(conf);
		
	}
	
	// 插入数据
	@Test
	public void testPut() throws Exception {
		
		// 用conn获取表数据操作对象
		Table t1 = conn.getTable(TableName.valueOf("t1"));
		
		// 构造数据封装对象
		Put r001 = new Put("r001".getBytes());
		r001.addColumn("f1".getBytes(), "name".getBytes(), "Aang".getBytes());
		r001.addColumn("f1".getBytes(), "age".getBytes(), Bytes.toBytes(12));
		
		Put r002 = new Put("r002".getBytes());
		r002.addColumn("f1".getBytes(), "name".getBytes(), "Katara".getBytes());
		r002.addColumn("f1".getBytes(), "age".getBytes(), Bytes.toBytes(14));
		
//		t1.put(r001);
		
		ArrayList<Put> puts = new ArrayList<Put>();
		puts.add(r001);
		puts.add(r002);
		
		t1.put(puts);
		
		t1.close();
		conn.close();
	}
	
	/**
	 * 	比较mutator和table.put
	 * 	1. mutator是一个异步操作,客户端先把数据写入本地的缓存,即返回,客户端不需要同步等待数据插入完成;而put是需要同步等待的
	 * 	2. mutator把数据写入本地缓存后,攒满一批再提交到hbase写入,可以提高数据插入的效率
	 * 	put(List<Put>)相当于自己用lst将数据缓存起来,然后用put方法同步提交
	 * 	对于静态批量数据(比如hdfs中已经存在的一堆文件)快速导入hbase,还有更高效的方法:Bulkloader
	 * 	原理:不需要通过网络RPC请求来提交数据,而是直接将原始文件转换成hbase的底层文件HFILE,然后直接上传到hbase的表目录中
	 */
	@Test
	public void testPut2() throws Exception {
		
		// put方法是一个同步操作,客户端如果有大量数据需要集中密集写入hbase表,客户端程序需要等待put全部完成
		// 而BufferedMutator则允许客户端设置一个缓冲区,提交的数据先放在缓冲区,后面会异步提交到hbase集群
		BufferedMutator bufferedMutator = conn.getBufferedMutator(TableName.valueOf("t1".getBytes()));
		
		long start = System.currentTimeMillis();
		for(int i = 0; i < 10000; i++) {
			Put r = new Put(("r01" + i).getBytes());
			r.addColumn("f1".getBytes(), "q1".getBytes(), Bytes.toBytes(i));
			
			// 写入客户端的缓存,后续会按周期提交到hbase集群
			bufferedMutator.mutate(r);
		}
		long end = System.currentTimeMillis();
		System.out.println(end - start);// 889
		
		bufferedMutator.close();
		
		
		Table t1 = conn.getTable(TableName.valueOf("t1".getBytes()));
		
		start = System.currentTimeMillis();
		for(int i = 0; i < 10000; i++) {
			Put r = new Put(("r02" + i).getBytes());
			r.addColumn("f1".getBytes(), "q1".getBytes(), Bytes.toBytes(i));
			t1.put(r);
		}
		end = System.currentTimeMillis();
		System.out.println(end - start);// 17499
		
		t1.close();
		conn.close();
	}
	
	// 删除表中的整行/某行的整列族/某行的某个列
	@Test
	public void testDeleteData() throws Exception {
		
		Table t1 = conn.getTable(TableName.valueOf("t1"));
		
		// delete参数对象中,如果只指定行键,则会删除整行的所有key-values
		Delete delete1 = new Delete("r001".getBytes());
		t1.delete(delete1);
		
		// delete参数对象中,指定了行键+列族,则会删除该行的指定列族中的key-values
		Delete delete2 = new Delete("r002".getBytes());
		delete2.addFamily("f1".getBytes());
		t1.delete(delete2);
		
		// delete参数对象中,指定了行键+列族+列名,则会删除该列
		Delete delete3 = new Delete("r003".getBytes());
		delete3.addColumn("f1".getBytes(), "q1".getBytes());
		t1.delete(delete3);
		
		t1.close();
		conn.close();
	}
	
	// 清空整个表的数据,会保留表定义,还可以保留表的region划分
	@Test
	public void testTruncate() throws Exception {
		
		Admin admin = conn.getAdmin();
		
		admin.disableTable(TableName.valueOf("t1".getBytes()));
		
		admin.truncateTable(TableName.valueOf("t1".getBytes()), true);
		
		admin.close();
		conn.close();
	}
}
import java.util.ArrayList;
import java.util.Iterator;

import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.hbase.Cell;
import org.apache.hadoop.hbase.CellUtil;
import org.apache.hadoop.hbase.HBaseConfiguration;
import org.apache.hadoop.hbase.TableName;
import org.apache.hadoop.hbase.client.Connection;
import org.apache.hadoop.hbase.client.ConnectionFactory;
import org.apache.hadoop.hbase.client.Get;
import org.apache.hadoop.hbase.client.Result;
import org.apache.hadoop.hbase.client.ResultScanner;
import org.apache.hadoop.hbase.client.Scan;
import org.apache.hadoop.hbase.client.Table;
import org.junit.Before;
import org.junit.Test;

public class QueryDemo {
	
	Connection conn = null;
	
	@Before
	public void init() throws Exception {
		
		Configuration conf = HBaseConfiguration.create();
		conf.set("hbase.zookeeper.quorum", "linux01:2181,linux02:2181,linux03:2181");
		conn = ConnectionFactory.createConnection(conf);
				
	}
	
	// get
	@Test
	public void testGet() throws Exception {
		
		Table t1 = conn.getTable(TableName.valueOf("t1"));
		
		
		Get get = new Get("r001".getBytes());
		
		// 过滤要返回的数据,满足条件的才返回
		get.setFilter(null);
		
		// Result是对一行中kv数据的封装
		Result result = t1.get(get);
		
		// 清楚数据schema的情况下,直接取某个key的value
//		byte[] value = result.getValue("f1".getBytes(), "name".getBytes());
//		System.out.println(new String(value));
		
		// 不清楚数据schema或每行的schema不一致的情况下,从result中遍历出每一个key-value
		while(result.advance()) {
			Cell cell = result.current();
			printCellUtil(cell);
		}
        
		t1.close();
		conn.close();
	}
	
	// 一次RPC请求,get多行数据
	@Test
	public void testGetSome() throws Exception {
		
		Table t1 = conn.getTable(TableName.valueOf("t1"));
		
		Get get1 = new Get("r001".getBytes());
		Get get2 = new Get("r002".getBytes());
		Get get3 = new Get("r003".getBytes());
		
		ArrayList<Get> gets = new ArrayList<Get>();
		gets.add(get1);
		gets.add(get2);
		gets.add(get3);
		
		Result[] results = t1.get(gets);
		for (Result result : results) {
			while(result.advance()) {
				Cell cell = result.current();
				printCellUtil(cell);
			}
		}
        
        t1.close();
        conn.close();
	}
	
	// scan
	@Test
	public void testScan() throws Exception {
		
		Table t1 = conn.getTable(TableName.valueOf("t1"));
		
		Scan scan = new Scan();
		
		// 指定扫描起始行键,默认包含
		scan.withStartRow("r001".getBytes());
		
		// 指定扫描结束行键,默认不包含
		scan.withStopRow("r003".getBytes(), true);
		
		// 指定返回数据只包含某列族的kv
		scan.addFamily("f1".getBytes());
		
		// 设置返回结果的数据过滤器,相当于实现条件查询
		scan.setFilter(null);
		
		// 设置在本scan中,一个Result中最多包含的kv个数,用于一行数据中kv量太大的情况
		scan.setBatch(2);
		
		// 是否让regionserver侧缓存本次扫描到的数据到内存中
		scan.setCacheBlocks(false);
		
		// 指定本次scan最多返回的result个数
		scan.setLimit(10);
        
        // 每次rpc请求记录数,默认1
        scan.setCaching(100);
		
		// 指定本次scan按反方向进行,从stoprowkey -> startrowkey
		scan.setReversed(false);
		
		// 从另一个维度:字节大小,来限制一个result的大小
		scan.setMaxResultSize(10240);
		
		// 指定本次scan是否要包含raw数据(已经被删除的, 或过时的版本数据)
		scan.setRaw(false);
		
		ResultScanner rsScanner = t1.getScanner(scan);
		
		Iterator<Result> iter = rsScanner.iterator();
		// 迭代每一个result
		while(iter.hasNext()) {
			
			Result rs = iter.next();
			
			// 清楚数据schema的情况下,直接取某个key的value
//			rs.getValue("f1".getBytes(), "name".getBytes());
			
			// 不清楚数据schema或每行的schema不一致的情况下,从result中遍历出每一个key-value
			while(rs.advance()) {
				Cell cell = rs.current();
				printCellUtil(cell);
			}
		}
		
        rsScanner.close();
        t1.close();
        conn.close();
	}
	
	// cell数据遍历的工具写法
	public static void printCellUtil(Cell cell) {
		
		byte[] cloneRow = CellUtil.cloneRow(cell);
		byte[] cloneFamily = CellUtil.cloneFamily(cell);
		byte[] cloneQualifier = CellUtil.cloneQualifier(cell);
		byte[] cloneValue = CellUtil.cloneValue(cell);
		
		String r = new String(cloneRow);
		String f = new String(cloneFamily);
		String q = new String(cloneQualifier);
		String v = new String(cloneValue);
		
		System.out.println(r + " -> " + f + " -> " + q + " -> " + v);
	}
	
	// cell数据遍历的底层写法
	public static void printCell(Cell cell) {
		
		byte[] rowArray = cell.getRowArray();
		int rowOffset = cell.getRowOffset();
		short rowLength = cell.getRowLength();
		
		byte[] familyArray = cell.getFamilyArray();
		int familyOffset = cell.getFamilyOffset();
		byte familyLength = cell.getFamilyLength();
		
		byte[] qualifierArray = cell.getQualifierArray();
		int qualifierOffset = cell.getQualifierOffset();
		int qualifierLength = cell.getQualifierLength();
		
		byte[] valueArray = cell.getValueArray();
		int valueOffset = cell.getValueOffset();
		int valueLength = cell.getValueLength();
		
		String r = new String(rowArray, rowOffset, rowLength);
		String f = new String(familyArray, familyOffset, familyLength);
		String q = new String(qualifierArray, qualifierOffset, qualifierLength);
		String v = new String(valueArray, valueOffset, valueLength);
		
		System.out.println(r + " -> " + f + " -> " + q + " -> " + v);
	}
}

标签:hbase,org,hadoop,t1,API,使用,import,HBase,getBytes
来源: https://www.cnblogs.com/lijiong/p/14729854.html

本站声明: 1. iCode9 技术分享网(下文简称本站)提供的所有内容,仅供技术学习、探讨和分享;
2. 关于本站的所有留言、评论、转载及引用,纯属内容发起人的个人观点,与本站观点和立场无关;
3. 关于本站的所有言论和文字,纯属内容发起人的个人观点,与本站观点和立场无关;
4. 本站文章均是网友提供,不完全保证技术分享内容的完整性、准确性、时效性、风险性和版权归属;如您发现该文章侵犯了您的权益,可联系我们第一时间进行删除;
5. 本站为非盈利性的个人网站,所有内容不会用来进行牟利,也不会利用任何形式的广告来间接获益,纯粹是为了广大技术爱好者提供技术内容和技术思想的分享性交流网站。

专注分享技术,共同学习,共同进步。侵权联系[81616952@qq.com]

Copyright (C)ICode9.com, All Rights Reserved.

ICode9版权所有