ICode9

精准搜索请尝试: 精确搜索
首页 > 其他分享> 文章详细

Note_Logistics_Day06

2021-05-24 18:34:16  阅读:209  来源: 互联网

标签:Logistics img Day06 Note Kudu kudu spark 数据 id



stypora-copy-images-to: img
typora-root-url: ./

Logistics_Day04:Kudu 入门使用

01-[复习]-上次课程内容回顾

​ 主要讲解:Kudu 存储引擎,类似HBase数据库,属于HBase和HDFS折中产品,既能够随机数据读写,又支持批量数据加载分析。

1、物流项目ETL流程
	三大业务板块
	1)、数据源Source,都是从Kafka消费交易业务数据
	2)、编写结构化流程序应用,消费数据Kafka数据,进行ETL存储到各个业务板块存储引擎,比如Kudu、ES等
	3)、开发相关业务板块应用程序
		- 离线报表和即席查询:Kudu、SparkSQL及Impala和Hue
		- 实时大屏和数据服务接口:ClickHouse、NodeJS&Vue、SpringCloud
		- 快递物流信息检索:Es、SpringCloud
		
2、Kudu 框架概述
	为什么要使用Kudu,解决什么问题???
		业务数据需要离线批处理(比如每日统计报表,批量加载数据分析):HDFS Parquet
		随机数据读写(比如依据某个字段或主键查询相关数据):HBase
					|
		Kudu 诞生背景,小米、网易都在使用Kudu
	SQL on Hadoop 技术框架发展史
		Hive ->   HDFS\HBase   最早,最基础
		Impala -> HDFS\HBase  内存分析引擎
		Impala -> Kudu       快速存储之上快速分析
	Kudu是什么
	Kudu 架构
		数据模型:表、Tablet、副本(leader和follower,Raft协议,数据一致性)
		分区策略:Range(范围)、Hash(哈希)、多级分区
		列式存储:查询少量列时 IO 少,速度快;数据压缩比高
		整体架构:分布式架构,主从架构,主节点和从节点
			Master:老大,管理者,元数据管理,Client请求Kudu表数据时,首先找到是Master
				使用Raft协议,不需要依赖Zookeeper,奇数个节点,高可用性
			TabletServer:小弟,干活的,管理Tablet数据,尤其数据读写
	Kudu 安装部署
		采用CM部署安装Kudu集群(伪分布式),如果是分布式集群,注意集群中各个机器时间同步

[外链图片转存失败,源站可能有防盗链机制,建议将图片保存下来直接上传(img-6Z98S3vb-1621851226298)(/img/1612408861444.png)]

Kudu Client API:三种方式
	- JavaClient、C++Client、PythonClient等API调用
	- 与Spark集成,使用RDD或DataFrame操作数据
		KuduContext
		SparkSession
	- 与Impala集成,提供SQL语句进行操作

02-[了解]-第6天:课程内容提纲

主要讲解:存储引擎Kudu,类似HBase数据库,由Cloudera公司开发,目的取代HDFS和HBase框架,

Kudu API使用
	1)、Java Client API使用
		DDL操作(创建表、删除表和修改表)
		DML操作(CRUD,增删改查)
		
	2)、与Spark集成
		提供与Spark集成库,直接调用API使用即可

Kudu提供三种方式,操作Kudu数据库,进行DDL操作和DML操作:

  • 1)、方式一:可通过Java client、C++ client、Python client操作Kudu表,要构建Client并编写应用程序;
    • https://kudu.apache.org/docs/developing.html#developing-applications-with-apache-kudu
  • 2)、方式二:可通过Impala的shell对Kudu表进行交互式的操作,因为Impala2.8及以上的版本已经集成了对Kudu的操作。
    • 直接定义Impala表数据存储在Kudu中,内部集成
  • 3)、方式三:通过Kudu-Spark包集成Kudu与Spark,并编写Spark应用程序来操作Kudu表
    • KuduContext,类似SparkContext,进行DDL操作和DML操作
    • SparkSession操作Kudu表数据,CRUD操作

无论是Java Client API使用,还是Kudu集成Spark使用,添加Maven 依赖:

<dependency>
  <groupId>org.apache.kudu</groupId>
  <artifactId>kudu-client</artifactId>
  <version>1.9.0-cdh6.2.1</version>
</dependency>

<dependency>
  <groupId>org.apache.kudu</groupId>
  <artifactId>kudu-spark_2.11</artifactId>
  <version>1.9.0-cdh6.2.1</version>
</dependency>

​ KUDU Client 在与服务端交互时,先从 Master Server 获取元数据信息,然后去 Tablet Server读写数据,如下图:

[外链图片转存失败,源站可能有防盗链机制,建议将图片保存下来直接上传(img-69QNixz5-1621851226300)(/img/1612410080266.png)]

今日工程目录结构说明:

[外链图片转存失败,源站可能有防盗链机制,建议将图片保存下来直接上传(img-FcxyBXWO-1621851226301)(/img/image-20210524084243810.png)]

03-[掌握]-Java 操作 Kudu之创建Maven Project

​ 首先使用Java Client API操作Kudu数据库,DDL操作(创建表、删除表及修改表)和DML操作(CRUD)。

创建Maven Project设置GAV如下图所示:

[外链图片转存失败,源站可能有防盗链机制,建议将图片保存下来直接上传(img-EBTeL1ad-1621851226302)(/img/1612411674589.png)]

创建Maven Module模块,用于编写Java API 操作Kudu,模块GAV设置如下所示:

[外链图片转存失败,源站可能有防盗链机制,建议将图片保存下来直接上传(img-e4cZmSz7-1621851226303)(/img/1612411802819.png)]

​ 构建Maven Project工程或Maven Module模块,POM文件添加依赖如下:

    <!-- 指定仓库位置,依次为aliyun、cloudera和jboss仓库 -->
    <repositories>
        <repository>
            <id>aliyun</id>
            <url>http://maven.aliyun.com/nexus/content/groups/public/</url>
        </repository>
        <repository>
            <id>cloudera</id>
            <url>https://repository.cloudera.com/artifactory/cloudera-repos/</url>
        </repository>
        <repository>
            <id>jboss</id>
            <url>http://repository.jboss.com/nexus/content/groups/public</url>
        </repository>
    </repositories>

    <!-- 版本属性 -->
    <properties>
        <maven.compiler.source>1.8</maven.compiler.source>
        <maven.compiler.target>1.8</maven.compiler.target>
        <kudu.version>1.9.0-cdh6.2.1</kudu.version>
        <junit.version>4.12</junit.version>
    </properties>

    <dependencies>
        <dependency>
            <groupId>org.apache.kudu</groupId>
            <artifactId>kudu-client</artifactId>
            <version>${kudu.version}</version>
        </dependency>

        <dependency>
            <groupId>org.apache.kudu</groupId>
            <artifactId>kudu-client-tools</artifactId>
            <version>${kudu.version}</version>
        </dependency>

        <dependency>
            <groupId>junit</groupId>
            <artifactId>junit</artifactId>
            <version>4.12</version>
        </dependency>

    </dependencies>

    <build>
        <plugins>
            <plugin>
                <groupId>org.apache.maven.plugins</groupId>
                <artifactId>maven-compiler-plugin</artifactId>
                <version>3.1</version>
                <configuration>
                    <source>1.8</source>
                    <target>1.8</target>
                </configuration>
            </plugin>
        </plugins>
    </build>

配置IDEA远程连接虚拟机,方便文件传输和远程命令行基本操作。

[外链图片转存失败,源站可能有防盗链机制,建议将图片保存下来直接上传(img-cHRnXzKW-1621851226304)(/img/1615876822272.png)]

04-[掌握]-Java 操作 Kudu之创建KuduClient实例

​ 在使用Java Client API之前,首先包package创建完成,此外,使用Java Client API操作Kudu数据库,需要创建客户端实例对象:KuduClient对象。

[外链图片转存失败,源站可能有防盗链机制,建议将图片保存下来直接上传(img-wyUENGxH-1621851226304)(/img/1612412290745.png)]

首先创建KuduClient对象,并且在应用运行结束的时候,需要关闭Client,所以采用JUnit方式构建和关闭。

package cn.itcast.kudu.table;

import org.apache.kudu.client.KuduClient;
import org.apache.kudu.client.KuduException;
import org.junit.After;
import org.junit.Before;
import org.junit.Test;

/**
 * 基于Java API对Kudu进行CRUD操作,包含创建表及删除表的操作
 */
public class KuduTableDemo {

	// 定声明KuduClient实例对象
	private KuduClient kuduClient = null ;

	@Before
	public void init() {
		// KuduMaster地址信息
		String masterAddresses = "node2.itcast.cn:7051" ;
		// 初始化KuduClient实例对象
		kuduClient = new KuduClient.KuduClientBuilder(masterAddresses)
			// 设置对此Kudu进行操作时超时时间,默认值为30s
			.defaultOperationTimeoutMs(10000)
			.build();
	}
	
	@Test
	public void testKuduClient(){
		System.out.println(kuduClient);
	}

	@After
	public void close() throws KuduException {
		// 测试完成以后,关闭连接
		if(null != kuduClient) {
			kuduClient.close();
		}
	}

}

​ 在Kudu提供API中,尤其是Java Client API,构建对象时,比如KuduClient,往往使用建造者设计模式,首先创建Builder对象,设置相关属性,最后获取实例对象。

05-[掌握]-Java 操作 Kudu之创建表(Hash分区)

任务:使用Java Client API在Kudu中创建表

[外链图片转存失败,源站可能有防盗链机制,建议将图片保存下来直接上传(img-b4tcGCu9-1621851226305)(/img/1615877522545.png)]

create table itcast_users(
    id int,
    name string,
    age byte,
    primary key(id)
)
paritition by hash(id) partitions 3
stored as kudu ;

Kudu提供面向对象API,将创建表DDL语句,封装到类中,具体如下图所示:

  • 1)、Schema,里面存储表的所有列信息(列名称和列类型)
  • 2)、CreateTableOptions,封装表的分区策略,分区数目和副本数

[外链图片转存失败,源站可能有防盗链机制,建议将图片保存下来直接上传(img-vL1nngmJ-1621851226305)(/img/1612420659082.png)]

创建测试方法,编写创建表的代码:

[外链图片转存失败,源站可能有防盗链机制,建议将图片保存下来直接上传(img-eCOXXCUD-1621851226306)(/img/1615877820058.png)]

	/**
	 * 用于构建Kudu表中每列的字段信息Schema
	 *
	 * @param name 字段名称
	 * @param type 字段类型
	 * @param isKey 是否为Key
	 * @return ColumnSchema对象
	 */
	private ColumnSchema newColumnSchema(String name, Type type, boolean isKey) {
		// 创建ColumnSchemaBuilder实例对象
		ColumnSchema.ColumnSchemaBuilder column = new ColumnSchema.ColumnSchemaBuilder(name, type);
		// 设置是否为主键
		column.key(isKey) ;
		// 构建 ColumnSchema
		return column.build() ;
	}

	/**
	 * 创建Kudu中的表,表的结构如下所示:
	 create table itcast_users(
		 id int,
		 name string,
		 age byte,
		 primary key(id)
	 )
	 paritition by hash(id) partitions 3
	 stored as kudu ;
	 */
	@Test
	public void createKuduTable() throws KuduException {
		// a. 定义Schema信息,列名称和列类型
		List<ColumnSchema> columns = new ArrayList<>();
		columns.add(new ColumnSchema.ColumnSchemaBuilder("id", Type.INT32).key(true).build());
		columns.add(newColumnSchema("name", Type.STRING, false));
		columns.add(newColumnSchema("age", Type.INT8, false));
		Schema schema = new Schema(columns) ;

		// b. 设置表的属性
		CreateTableOptions options = new CreateTableOptions() ;
		// 设置分区策略
		options.addHashPartitions(Arrays.asList("id"), 3);
		// 设置副本数目
		options.setNumReplicas(1) ;

		// c. 传递参数,创建表
		/*
			public KuduTable createTable(String name, Schema schema, CreateTableOptions builder)
		 */
		KuduTable kuduTable = kuduClient.createTable("itcast_users", schema, options);
		System.out.println("Kudu Table ID = " + kuduTable.getTableId());
	}

06-[掌握]-Java 操作 Kudu之删除表

任务:删除Kudu中表,先判断表是否存在。

	/**
	 * 判断表是否存在,如果存在,将表删除
	 */
	@Test
	public void dropKuduTable() throws KuduException {
		// 判断表是否存在
		if(kuduClient.tableExists("itcast_users")){
			// 传递表的名称,进行删除
			kuduClient.deleteTable("itcast_users") ;
		}
	}

07-[掌握]-Java 操作 Kudu之插入数据

任务Task:向Kudu表中插入数据,先插入单条数据,再批量插入。

  • 1)、获取表的句柄:KuduTable,通过KuduClient获取
  • 2)、插入数据时,创建Insert对象,设置每行Row的值
  • 3)、当向Kudu表插入数据时,创建会话实例对象KuduSession类似PreparedStatement对象

[外链图片转存失败,源站可能有防盗链机制,建议将图片保存下来直接上传(img-Vt2Uwfy2-1621851226306)(/img/1612421761460.png)]

编写代码,向Kudu表插入数据,步骤如下所示:

[外链图片转存失败,源站可能有防盗链机制,建议将图片保存下来直接上传(img-yfKOvYMC-1621851226307)(/img/1615878818247.png)]

	/**
	 * 将数据插入到Kudu Table中: INSERT INTO (id, name, age) VALUES (1001, "zhangsan", 26)
	 */
	@Test
	public void insertKuduData() throws KuduException {
		// a. 获取操作表句柄
		KuduTable kuduTable = kuduClient.openTable("itcast_users");

		// b. 获取KuduSession实例对象
		KuduSession kuduSession = kuduClient.newSession();

		// c. 插入数据,获取Insert对象
		Insert insert = kuduTable.newInsert();
		// d. 获取Row对象
		PartialRow insertRow = insert.getRow();
		// 设置值
		insertRow.addInt("id", 1001);
		insertRow.addString("name", "itcast");
		insertRow.addByte("age", (byte)25);

		// e. 插入数据
		kuduSession.apply(insert);

		// f. 关闭连接
		kuduSession.close();
	}

上面编写代码,完成单条数据插入,接下来,批量插入数据,代码如下所示:

	/**
	 * 将数据插入到Kudu Table中: INSERT INTO (id, name, age) VALUES (1001, "zhangsan", 26)
	 */
	@Test
	public void insertKuduData() throws KuduException {
		// a. 获取操作表句柄
		KuduTable kuduTable = kuduClient.openTable("itcast_users");

		// b. 获取KuduSession实例对象
		KuduSession kuduSession = kuduClient.newSession();
		// 设置手动提交,刷新数据
		kuduSession.setFlushMode(SessionConfiguration.FlushMode.MANUAL_FLUSH);
		// 设置缓存数据量
		kuduSession.setMutationBufferSpace(1000);

		Random random = new Random();
		for(int index = 0; index < 100; index ++){
			// c. 插入数据,获取Insert对象
			Insert insert = kuduTable.newInsert();
			// d. 获取Row对象
			PartialRow insertRow = insert.getRow();
			// 设置值
			insertRow.addInt("id", 100 + index);
			insertRow.addString("name", "zhangsan-" + index);
			insertRow.addByte("age", (byte)(random.nextInt(10) + 21));

			// e. 插入数据
			kuduSession.apply(insert);
		}
		// 手动提交
		kuduSession.flush();

		// f. 关闭连接
		kuduSession.close();
	}

08-[掌握]-Java 操作 Kudu之全量查询数据

任务:从Kudu表中查询数据,属于全量查询数据

从Kudu中查询数据与从Hbase查询数据代码类似,进行表的数据扫描Scanner

[外链图片转存失败,源站可能有防盗链机制,建议将图片保存下来直接上传(img-o8oi16fg-1621851226307)(/img/1615880361316.png)]

注意:从Kudu表加载数据时,思路与HBase不一样,从表的每个Tablet中扫描查询数据,放到迭代器中,最后将所有Tablet查询结果的迭代器放入迭代器中。

[外链图片转存失败,源站可能有防盗链机制,建议将图片保存下来直接上传(img-ew3KfU03-1621851226307)(/img/1612422739021.png)]

编写代码,从Kudu表全量加载数据,注意,遍历查询数据时,进行双层循环获取数据。

	/**
	 * 从Kudu表中全量加载数据
	 */
	@Test
	public void queryKuduData() throws KuduException {
		// 1. 获取表的句柄
		KuduTable kuduTable = kuduClient.openTable("itcast_users");

		// 2. 获取扫描器对象
		KuduScanner.KuduScannerBuilder scannerBuilder = kuduClient.newScannerBuilder(kuduTable);
		KuduScanner kuduScanner = scannerBuilder.build();

		// 3. 遍历获取的数据
		int index = 0 ;
		while (kuduScanner.hasMoreRows()){  // 判断是否还有表的Tablet数据为获取
			index += 1;
			System.out.println("tablet index = " + index);
			// 获取每个tablet中扫描的数据
			RowResultIterator rowResults = kuduScanner.nextRows();
			// 遍历每个Tablet中数据
			while (rowResults.hasNext()){
				RowResult rowResult = rowResults.next();
				System.out.println(
					"id = " + rowResult.getInt("id")
						+ ", name = " + rowResult.getString("name")
						+ ", age = " + rowResult.getByte("age")
				);
			}
		}
	}

09-[掌握]-Java 操作 Kudu之过滤查询数据

任务:在实际项目中,从Kudu加载数据,肯定有过滤条件,接下来实现,如何进行过滤查询数据

[外链图片转存失败,源站可能有防盗链机制,建议将图片保存下来直接上传(img-u8K3XPkw-1621851226308)(/img/1615880958773.png)]

使用KuduPlus实现上述过滤条件

[外链图片转存失败,源站可能有防盗链机制,建议将图片保存下来直接上传(img-p6FNbbma-1621851226308)(/img/1615881042484.png)]

分析思路:

  • 1)、功能一、选取字段,在Kudu中或者SQL语句中,称为project,投影,选择字段
  • 2)、功能二、过滤条件,在Kudu中或者SQL语句中,称为predicate,谓词,过滤条件
	/**
	 * 从Kudu表中全量加载数据
	 */
	@Test
	public void queryKuduData() throws KuduException {
		// 1. 获取表的句柄
		KuduTable kuduTable = kuduClient.openTable("itcast_users");

		// 2. 获取扫描器对象
		KuduScanner.KuduScannerBuilder scannerBuilder = kuduClient.newScannerBuilder(kuduTable);
		// TODO: 设置过滤条件
		/*
			查询id和age两个字段的值,年龄age小于25,id大于150
		 */
		// TODO: 查询id和age两个字段
		scannerBuilder.setProjectedColumnNames(Arrays.asList("id", "age"));
		// TODO: 年龄age小于25,id大于150
		scannerBuilder.addPredicate(
			KuduPredicate.newComparisonPredicate(
				newColumnSchema("id", Type.INT32, true),
				KuduPredicate.ComparisonOp.GREATER,
				150
			)
		);
		scannerBuilder.addPredicate(
			KuduPredicate.newComparisonPredicate(
				newColumnSchema("age", Type.INT8, false),
				KuduPredicate.ComparisonOp.LESS,
				(byte)25
			)
		);

		KuduScanner kuduScanner = scannerBuilder.build();

		// 3. 遍历获取的数据
		int index = 0 ;
		while (kuduScanner.hasMoreRows()){  // 判断是否还有表的Tablet数据为获取
			index += 1;
			System.out.println("tablet index = " + index);
			// 获取每个tablet中扫描的数据
			RowResultIterator rowResults = kuduScanner.nextRows();
			// 遍历每个Tablet中数据
			while (rowResults.hasNext()){
				RowResult rowResult = rowResults.next();
				System.out.println(
					"id = " + rowResult.getInt("id")
						+ ", age = " + rowResult.getByte("age")
				);
			}
		}
	}

10-[掌握]-Java 操作 Kudu之更新及删除数据

任务:向Kudu表中数据进行更新和删除操作,类似Insert插入数据时操作。

  • 1)、更新数据,只能根据主键更新数据

[外链图片转存失败,源站可能有防盗链机制,建议将图片保存下来直接上传(img-EoYlNBy8-1621851226309)(/img/1615881765826.png)]

	/**
	 * 更新Kudu表中数据
	 */
	@Test
	public void updateKuduData() throws KuduException {
		// a. 获取操作表句柄
		KuduTable kuduTable = kuduClient.openTable("itcast_users");

		// b. 获取KuduSession实例对象
		KuduSession kuduSession = kuduClient.newSession();

		// c. 获取更新数据update对象
		Update newUpdate = kuduTable.newUpdate();
		// 获取Row对象
		PartialRow updateRow = newUpdate.getRow();
		// 设置更新的数据
		updateRow.addInt("id", 153);
		updateRow.addString("name", "张三疯");

		// e. 更新数据
		kuduSession.apply(newUpdate);

		// f. 关闭连接
		kuduSession.close();
	}

​ 在Kudu中,除了提供insert和update插入与更新方法外,开提供:upsert,表示当表中主键存在时,更新数据;不存在时,插入数据。实际项目中,建议使用upsert操作。

	/**
	 * 更新Kudu表中数据
	 */
	@Test
	public void upsertKuduData() throws KuduException {
		// a. 获取操作表句柄
		KuduTable kuduTable = kuduClient.openTable("itcast_users");

		// b. 获取KuduSession实例对象
		KuduSession kuduSession = kuduClient.newSession();

		// c. 获取更新数据update对象
		Upsert newUpsert = kuduTable.newUpsert();
		// 获取Row对象
		PartialRow upsertRow = newUpsert.getRow();
		// 设置更新的数据
		upsertRow.addInt("id", 253);
		upsertRow.addString("name", "张疯");
		upsertRow.addByte("age", (byte)50);

		// e. 更新数据
		kuduSession.apply(newUpsert);
		kuduSession.flush();

		// f. 关闭连接
		kuduSession.close();
	}

对Kudu表数据进行删除时,需要按照主键id删除。

	/**
	 * 删除Kudu表中数据
	 */
	@Test
	public void deleteKuduData() throws KuduException {
		// a. 获取操作表句柄
		KuduTable kuduTable = kuduClient.openTable("itcast_users");

		// b. 获取KuduSession实例对象
		KuduSession kuduSession = kuduClient.newSession();

		// c. 获取删除数据对象
		Delete newDelete = kuduTable.newDelete();
		// 获取Row对象
		PartialRow deleteRow = newDelete.getRow();
		// 设置主键
		deleteRow.addInt("id", 253);

		// e. 更新数据
		kuduSession.apply(newDelete);
		kuduSession.flush();

		// f. 关闭连接
		kuduSession.close();
	}

11-[掌握]-Java 操作 Kudu之创建表(范围分区)

为了提供可扩展性,Kudu 表被划分为称为 tablets 的单元,并分布在许多 tablet servers 上。

  • 1)、哈希分区:Hash Partitioning
    • 哈希分区通过哈希值,将行分配到不同的 buckets ( 存储桶 )中;
    • 哈希分区是一种有效的策略,当不需要对表进行有序访问时,哈希分区对于在 tablet 之间随
      机散布这些功能是有效的,这有助于减轻热点和 tablet 大小不均匀;

[外链图片转存失败,源站可能有防盗链机制,建议将图片保存下来直接上传(img-octxG2uM-1621851226309)(/img/1615882540436.png)]

  • 2)、范围分区:Range Partitioning
    • 范围分区可根据存入数据的数据量,均衡的存储到各个机器上,防止机器出现负载不均衡现象;
    • 分区键必须是主键 或 主键的一部分;
    • Range分区的方式:id

[外链图片转存失败,源站可能有防盗链机制,建议将图片保存下来直接上传(img-TdPfwI0j-1621851226310)(/img/1615882598956.png)]

实现上述案例需求:创建Kudu表,按照id进行范围分区

	/**
	 * 创建Kudu中的表,采用对id进行Range范围分区
	 */
	@Test
	public void createKuduTableByRange() throws KuduException {
		// a. 定义Schema信息,列名称和列类型
		List<ColumnSchema> columns = new ArrayList<>();
		columns.add(new ColumnSchema.ColumnSchemaBuilder("id", Type.INT32).key(true).build());
		columns.add(newColumnSchema("name", Type.STRING, false));
		columns.add(newColumnSchema("age", Type.INT8, false));
		Schema schema = new Schema(columns) ;

		// b. 设置表的属性
		CreateTableOptions options = new CreateTableOptions() ;
		// 设置分区策略
		options.setRangePartitionColumns(Arrays.asList("id")); // 设置范围分区字段名称
		/*
			id < 100
			100 <= id < 500
			id > 500
		 */
		// id < 100
		PartialRow upper100 = new PartialRow(schema);
		upper100.addInt("id", 100);
		options.addRangePartition(new PartialRow(schema), upper100);

		// 100 <= id < 500 
		PartialRow lower100 = new PartialRow(schema);
		lower100.addInt("id", 100);
		PartialRow upper500 = new PartialRow(schema);
		upper500.addInt("id", 500);
		options.addRangePartition(lower100, upper500);

		// id > 500
		PartialRow lower500 = new PartialRow(schema);
		lower500.addInt("id", 500);
		options.addRangePartition(lower500, new PartialRow(schema));

		// 设置副本数目
		options.setNumReplicas(1) ;

		// c. 传递参数,创建表
		/*
			public KuduTable createTable(String name, Schema schema, CreateTableOptions builder)
		 */
		KuduTable kuduTable = kuduClient.createTable("itcast_users_range", schema, options);
		System.out.println("Kudu Table ID = " + kuduTable.getTableId());
	}

[外链图片转存失败,源站可能有防盗链机制,建议将图片保存下来直接上传(img-9sUqmb2P-1621851226310)(/img/1615883070155.png)]

12-[掌握]-Java 操作 Kudu之创建表(多级分区)

​ 在Kudu中,创建表时,除了Hash分区和Range范围分区以外, 还支持多级分区:

  • 1)、形式一、先哈希分区,再进行范围分区
  • 2)、形式二、先哈希分区,再哈希分区

多级分区特点:

  • Kudu 允许一个表上组合使用Hash分区 及 Range分区;
  • 分区键必须是主键 或 主键的一部分;
  • 多级分区可以保留各个分区类型的优点,同时减少每个分区的缺点;

[外链图片转存失败,源站可能有防盗链机制,建议将图片保存下来直接上传(img-T5IpLYut-1621851226311)(/img/1615883257060.png)]

创建表,实现上述表分区要求:先按照id进行哈希分区,再按照age做范围分区

	/**
	 * 创建Kudu中的表,采用多级分区策略,结合哈希分区和范围分区组合使用
	 */
	@Test
	public void createKuduTableMulti() throws KuduException {
		// a. 构建表的Schema信息
		List<ColumnSchema> columnSchemas = new ArrayList<ColumnSchema>();
		columnSchemas.add(newColumnSchema("id", Type.INT32, true)) ;
		columnSchemas.add(newColumnSchema("age", Type.INT8, true)) ;
		columnSchemas.add(newColumnSchema("name", Type.STRING, false)) ;
		// 定义Schema信息
		Schema schema = new Schema(columnSchemas) ;
		
		// b. Kudu表的分区策略及分区副本数目设置
		CreateTableOptions tableOptions = new CreateTableOptions() ;
		// TODO: e.1. 设置哈希分区
		List<String> columnsHash = new ArrayList<>() ;
		columnsHash.add("id") ;
		tableOptions.addHashPartitions(columnsHash, 5) ;
		
		// TODO: e.2. 设值范围分区
		/*
			age 做 range分区,分3个区
			- < 21(小于等于20岁)
			- 21 - 41(21岁到40岁)
			- 41(41岁以上,涵盖41岁)
		*/
		List<String> columnsRange = new ArrayList<>() ;
		columnsRange.add("age") ;
		tableOptions.setRangePartitionColumns(columnsRange) ;
		// 添加范围分区
		PartialRow upper21 = new PartialRow(schema) ;
		upper21.addByte("age", (byte)21);
		tableOptions.addRangePartition(new PartialRow(schema), upper21) ;
		// 添加范围分区
		PartialRow lower21 = new PartialRow(schema) ;
		lower21.addByte("age", (byte)21);
		PartialRow upper41 = new PartialRow(schema) ;
		upper41.addByte("age", (byte)41);
		tableOptions.addRangePartition(lower21, upper41) ;
		// 添加范围分区
		PartialRow lower41 = new PartialRow(schema) ;
		lower41.addByte("age", (byte)41);
		tableOptions.addRangePartition(lower41, new PartialRow(schema)) ;
		
		// 副本数设置
		tableOptions.setNumReplicas(1) ;
		
		// c. 在Kudu中创建表
		KuduTable userTable = kuduClient.createTable("itcast_users_multi", schema, tableOptions);
		System.out.println(userTable.toString());
	}

[外链图片转存失败,源站可能有防盗链机制,建议将图片保存下来直接上传(img-y7pVm4NG-1621851226311)(/img/1615883515978.png)]

13-[掌握]-Java 操作 Kudu之 添加列和删除列

任务:对Kudu中表进行修改,要么表添加列,要么表删除列,编程演示代码

  • 1)、添加列addColumn
	/**
	 * 对Kudu中表进行修改,增加列:address,String
	 */
	@Test
	public void alterKuduTableAddColumn() throws KuduException {
		// 添加列
		AlterTableOptions ato = new AlterTableOptions() ;
		ato.addColumn("address",Type.STRING, "shanghai");
		// 修改表
		AlterTableResponse response = kuduClient.alterTable("itcast_users", ato);
		System.out.println(response.getTableId());
	}
  • 2)、删除列dropColumn
	/**
	 * 对Kudu中表进行修改,删除列:address
	 */
	@Test
	public void alterKuduTableDropColumn() throws KuduException {
		// 添加列
		AlterTableOptions ato = new AlterTableOptions() ;
		ato.dropColumn("address");
		// 修改表
		AlterTableResponse response = kuduClient.alterTable("itcast_users", ato);
		System.out.println(response.getTableId());
	}

14-[掌握]-Kudu 集成 Spark之创建Maven Project

​ Kudu支持与Spark集成,并且提供集成库jar包,直接引入库,调用API即可,提供2套API:

  • 1)、第一套:基于RDD数据集操作,KuduContext上下文对象
    • DDL操作,创建Kudu表和删除Kudu表
  • 2)、第二套:基于DataFrame数据集操作,SparkSession会话对象
    • 从Kudu表中加载load和保存save数据

首先,创建Maven Module模块,添加相关依赖,创建包,如下所示:

[外链图片转存失败,源站可能有防盗链机制,建议将图片保存下来直接上传(img-HoAb0m1j-1621851226312)(/img/1612427178722.png)]

构建Maven Project工程或Maven Module模块,POM文件添加依赖如下:

    <!-- 指定仓库位置,依次为aliyun、cloudera和jboss仓库 -->
    <repositories>
        <repository>
            <id>aliyun</id>
            <url>http://maven.aliyun.com/nexus/content/groups/public/</url>
        </repository>
        <repository>
            <id>cloudera</id>
            <url>https://repository.cloudera.com/artifactory/cloudera-repos/</url>
        </repository>
        <repository>
            <id>jboss</id>
            <url>http://repository.jboss.com/nexus/content/groups/public</url>
        </repository>
    </repositories>

    <!-- 版本属性 -->
    <properties>
        <scala.version>2.11.12</scala.version>
        <scala.binary.version>2.11</scala.binary.version>
        <spark.version>2.4.0-cdh6.2.1</spark.version>
        <hadoop.version>3.0.0-cdh6.2.1</hadoop.version>
        <kudu.version>1.9.0-cdh6.2.1</kudu.version>
    </properties>

    <!-- 依赖JAR包 -->
    <dependencies>

        <dependency>
            <groupId>org.apache.kudu</groupId>
            <artifactId>kudu-client-tools</artifactId>
            <version>${kudu.version}</version>
        </dependency>

        <!-- Kudu Client 依赖包 -->
        <dependency>
            <groupId>org.apache.kudu</groupId>
            <artifactId>kudu-client</artifactId>
            <version>${kudu.version}</version>
        </dependency>

        <!-- Junit 依赖包 -->
        <dependency>
            <groupId>junit</groupId>
            <artifactId>junit</artifactId>
            <version>4.12</version>
        </dependency>

        <!-- https://mvnrepository.com/artifact/org.apache.kudu/kudu-spark2 -->
        <dependency>
            <groupId>org.apache.kudu</groupId>
            <artifactId>kudu-spark2_2.11</artifactId>
            <version>${kudu.version}</version>
        </dependency>

        <!-- 依赖Scala语言 -->
        <dependency>
            <groupId>org.scala-lang</groupId>
            <artifactId>scala-library</artifactId>
            <version>${scala.version}</version>
        </dependency>

        <!-- Spark Core 依赖 -->
        <dependency>
            <groupId>org.apache.spark</groupId>
            <artifactId>spark-core_${scala.binary.version}</artifactId>
            <version>${spark.version}</version>
        </dependency>

        <!-- Spark SQL 依赖 -->
        <dependency>
            <groupId>org.apache.spark</groupId>
            <artifactId>spark-sql_${scala.binary.version}</artifactId>
            <version>${spark.version}</version>
        </dependency>

        <!-- Hadoop Client 依赖 -->
        <dependency>
            <groupId>org.apache.hadoop</groupId>
            <artifactId>hadoop-client</artifactId>
            <version>${hadoop.version}</version>
        </dependency>

    </dependencies>

    <build>
        <outputDirectory>target/classes</outputDirectory>
        <testOutputDirectory>target/test-classes</testOutputDirectory>
        <resources>
            <resource>
                <directory>${project.basedir}/src/main/resources</directory>
            </resource>
        </resources>
        <!-- Maven 编译的插件 -->
        <plugins>
            <plugin>
                <groupId>org.apache.maven.plugins</groupId>
                <artifactId>maven-compiler-plugin</artifactId>
                <version>3.0</version>
                <configuration>
                    <source>1.8</source>
                    <target>1.8</target>
                    <encoding>UTF-8</encoding>
                </configuration>
            </plugin>
            <plugin>
                <groupId>net.alchim31.maven</groupId>
                <artifactId>scala-maven-plugin</artifactId>
                <version>3.2.0</version>
                <executions>
                    <execution>
                        <goals>
                            <goal>compile</goal>
                            <goal>testCompile</goal>
                        </goals>
                    </execution>
                </executions>
            </plugin>
        </plugins>
    </build>

编写Spark Application时,设置日志级别,通过log4j.properties设置,内容如下所示:

# Set everything to be logged to the console
log4j.rootCategory=WARN, console
log4j.appender.console=org.apache.log4j.ConsoleAppender
log4j.appender.console.target=System.err
log4j.appender.console.layout=org.apache.log4j.PatternLayout
log4j.appender.console.layout.ConversionPattern=%d{yy/MM/dd HH:mm:ss} %p %c{1}: %m%n

# Set the default spark-shell log level to WARN. When running the spark-shell, the
# log level for this class is used to overwrite the root logger's log level, so that
# the user can have different defaults for the shell and regular Spark apps.
log4j.logger.org.apache.spark.repl.Main=WARN

# Settings to quiet third party logs that are too verbose
log4j.logger.org.spark_project.jetty=WARN
log4j.logger.org.spark_project.jetty.util.component.AbstractLifeCycle=ERROR
log4j.logger.org.apache.spark.repl.SparkIMain$exprTyper=INFO
log4j.logger.org.apache.spark.repl.SparkILoop$SparkILoopInterpreter=INFO
log4j.logger.org.apache.parquet=ERROR
log4j.logger.parquet=ERROR

# SPARK-9183: Settings to avoid annoying messages when looking up nonexistent UDFs in SparkSQL with Hive support
log4j.logger.org.apache.hadoop.hive.metastore.RetryingHMSHandler=FATAL
log4j.logger.org.apache.hadoop.hive.ql.exec.FunctionRegistry=ERROR
spark-shell时,可以通过--packages或--jars加载依赖jar包:
1)、--packages
	--packages org.apache.kudu:kudu-spark_2.10:1.5.0
	必须联网,基于ivy方式下载所需要的jar包,存储在当前用户宿主目录下$USER_HOME/.ivy/jars/

2)、--jars
	--jars /root/jars/xxx.jar,/root/jars/yy.jar 
	需要将jar包下载完成,放在本地,加载到应用中

15-[掌握]-Kudu 集成 Spark之创建表和删除表

任务:使用KuduContext创建Kudu表和删除Kudu表

[外链图片转存失败,源站可能有防盗链机制,建议将图片保存下来直接上传(img-U9zFWH7W-1621851226312)(/img/1615885235177.png)]

package cn.itcast.kudu.table

import java.util

import org.apache.kudu.client.CreateTableOptions
import org.apache.kudu.spark.kudu.KuduContext
import org.apache.spark.sql.SparkSession
import org.apache.spark.sql.functions._
import org.apache.spark.sql.types.{IntegerType, StringType, StructField, StructType}

/**
 * Kudu与Spark集成,使用KuduContext创建表和删除表
 */
object KuduSparkTableDemo {
	
	/**
	 * 创建Kudu表,指定名称
	 *
	 * @param tableName 表的名称
	 * @param kuduContext KuduContext实例对象
	 */
	def createKuduTable(tableName: String, kuduContext: KuduContext): Unit = {
		// a. 表的Schema信息
		val schema: StructType = StructType(
			Array(
				StructField("id", IntegerType, nullable = false),
				StructField("name", StringType, nullable = true),
				StructField("age", IntegerType, nullable = true),
				StructField("gender", StringType, nullable = true)
			)
		)
		// b. 表的主键
		val keys: Seq[String] = Seq("id")
		// c. 创建表的选项设置
		val options: CreateTableOptions = new CreateTableOptions()
		options.setNumReplicas(1)
		options.addHashPartitions(util.Arrays.asList("id"), 3)
		// 调用创建表方法
		/*
		  def createTable(
		      tableName: String,
		      schema: StructType,
		      keys: Seq[String],
		      options: CreateTableOptions
		  ): KuduTable
		 */
		val kuduTable = kuduContext.createTable(tableName, schema, keys, options)
		println("Kudu Table ID: " + kuduTable)
	}
	
	/**
	 * 删除Kudu中表
	 * @param tableName 表的名称
	 * @param kuduContext KuduContext实例对象
	 */
	def dropKuduTable(tableName: String, kuduContext: KuduContext) = {
		// 判断表是否存在,如果存在,就删除表
		if(kuduContext.tableExists(tableName)){
			kuduContext.deleteTable(tableName)
		}
	}
	
	def main(args: Array[String]): Unit = {
		// 1. 构建SparkSession实例对象
		val spark: SparkSession = SparkSession.builder()
    		.appName(this.getClass.getSimpleName.stripSuffix("$"))
    		.master("local[2]")
    		.config("spark.sql.shuffle.partitions", "2")
    		.getOrCreate()
		import spark.implicits._
		
		// TODO: 创建KuduContext对象
		val kuduContext: KuduContext = new KuduContext("node2.itcast.cn:7051", spark.sparkContext)
		println(s"KuduContext: ${kuduContext}")
		
		// 任务1: 创建表
		//createKuduTable("kudu_itcast_users", kuduContext)
		
		// 任务2: 删除表
		dropKuduTable("kudu_itcast_users", kuduContext)
		
		
		// 应用结束,关闭资源
		spark.stop()
	}
	
}

注意:在创建表时,主键不能为null,必须设置为false,字段放在最前面。

16-[理解]-Kudu 集成 Spark之数据CRUD操作

任务:编写程序,对Kudu表的数据,进行CRUD操作,与Java Client API类似

  • 1)、Insert插入数据、INSERT-IGNORE 如果存在,忽略
  • 2)、DELETE删除数据
  • 3)、UPDATE更新数据
  • 4)、UPSERT插入更新数据,主键不存在就是插入,存在就是更新
  • 1)、插入数据insert
package cn.itcast.kudu.data

import cn.itcast.kudu.table.KuduSparkTableDemo.createKuduTable
import org.apache.kudu.spark.kudu.KuduContext
import org.apache.spark.sql.{DataFrame, SparkSession}

/**
 * 对Kudu表的数据,进行CRUD操作
 */
object KuduSparkDataDemo {
	
	/**
	 * 向Kudu表中插入数据
	 */
	def insertData(spark: SparkSession, kuduContext: KuduContext, tableName: String): Unit = {
		// a. 模拟产生数据
		// TODO: 当RDD或Seq中数据类型为元组时,直接调用toDF,指定列名称,转换为DataFrame
		val usersDF: DataFrame = spark.createDataFrame(
			Seq(
				(1001, "zhangsan", 23, "男"),
				(1002, "lisi", 22, "男"),
				(1003, "xiaohong", 24, "女"),
				(1004, "zhaoliu2", 33, "男")
			)
		).toDF("id", "name", "age", "gender")
		
		// b. 将数据保存至Kudu表
		kuduContext.insertRows(usersDF, tableName)
	}
	
	def main(args: Array[String]): Unit = {
		// 1. 构建SparkSession实例对象
		val spark: SparkSession = SparkSession.builder()
			.appName(this.getClass.getSimpleName.stripSuffix("$"))
			.master("local[2]")
			.config("spark.sql.shuffle.partitions", "2")
			.getOrCreate()
		import spark.implicits._
		
		// TODO: 创建KuduContext对象
		val kuduContext: KuduContext = new KuduContext("node2.itcast.cn:7051", spark.sparkContext)
		//println(s"KuduContext: ${kuduContext}")
		
		val tableName = "kudu_itcast_users"
		
		// 插入数据
		insertData(spark, kuduContext, tableName)
		
		// 查询数据
		//selectData(spark, kuduContext, tableName)
		
		// 更新数据
		//updateData(spark, kuduContext, tableName)
		
		// 插入更新数据
		//upsertData(spark, kuduContext, tableName)
		
		// 删除数据
		//deleteData(spark, kuduContext, tableName)
		
		
		// 应用结束,关闭资源
		spark.stop()
	}
	
}

  • 2)、查询数据,将数据封装到RDD数据集
	/**
	 * 从Kudu表中读取数据,封装到RDD数据集
	 */
	def selectData(spark: SparkSession, kuduContext: KuduContext, tableName: String): Unit = {
		/*
		  def kuduRDD(
		      sc: SparkContext,
		      tableName: String,
		      columnProjection: Seq[String] = Nil,
		      options: KuduReadOptions = KuduReadOptions()
		  ): RDD[Row]
		 */
		val kuduRDD: RDD[Row] = kuduContext.kuduRDD(spark.sparkContext, tableName, Seq("name", "age"))
		
		// 遍历数据
		kuduRDD.foreach{row =>
			println(
				"name = " + row.getString(0) + ", age = " + row.getInt(1)
			)
		}
	}

此外,可以使用KuduContext对表的数据进行update、upsert、delete等操作,类似insert操作。

17-[掌握]-Kudu 集成 Spark之DataFrame API

任务:基于SparkSQL提供外部数据源方式从Kudu数据库中加载load和保存save数据,封装DataFrame中。

从Kudu表加载和保存数据数据时,可选项如下所示:

[外链图片转存失败,源站可能有防盗链机制,建议将图片保存下来直接上传(img-SygYXZOA-1621851226313)(/img/1612430230737.png)]

编写SparkSQL程序,从Kudu表加载load数据,进行转换,最终保存到Kudu表中。

  • 1)、加载数据
		// TODO: 2. 从Kudu表加载数据
		val kuduDF: DataFrame = spark.read
			.format("kudu")
			.option("kudu.table", "kudu_itcast_users")
			.option("kudu.master", "node2.itcast.cn:7051")
			.load()
		kuduDF.printSchema()
		kuduDF.show(10, truncate = false)
  • 2)、保存数据
		// TODO: 保存数据到Kudu表
		etlDF.write
			.mode(SaveMode.Append)
			.format("kudu")
			.option("kudu.table", "kudu_itcast_users")
			.option("kudu.master", "node2.itcast.cn:7051")
			.option("kudu.operation", "upsert")
			.save()

完整代码:从Kudu表读取数据,经过ETL转换,保存到Kudu表

package cn.itcast.kudu.sql

import org.apache.spark.sql.expressions.UserDefinedFunction
import org.apache.spark.sql.{DataFrame, SaveMode, SparkSession}
import org.apache.spark.sql.functions._

/**
 * 编写SparkSQL程序,从Kudu表加载load数据,进行转换,最终保存到Kudu表中。
 */
object KuduSparkSQLDemo {
	
	def main(args: Array[String]): Unit = {
		// 1. 构建SparkSession实例对象
		val spark: SparkSession = SparkSession.builder()
			.appName(this.getClass.getSimpleName.stripSuffix("$"))
			.master("local[2]")
			.config("spark.sql.shuffle.partitions", "2")
			.getOrCreate()
		import spark.implicits._
		
		// TODO: 2. 从Kudu表加载数据
		val kuduDF: DataFrame = spark.read
			.format("kudu")
			.option("kudu.table", "kudu_itcast_users")
			.option("kudu.master", "node2.itcast.cn:7051")
			.load()
		//kuduDF.printSchema()
		//kuduDF.show(10, truncate = false)
		
		/*
			+----+--------+---+------+
			|id  |name    |age|gender|
			+----+--------+---+------+
			|1001|zhangsan|23 |男    |  -> M
			|1002|lisi    |22 |男    |
			|1004|zhaoliu2|33 |男    |
			|1003|xiaohong|24 |女    |  -> F
			+----+--------+---+------+
		 */
		// 自定义UDF函数,转换gender性别
		val gender_to_udf: UserDefinedFunction = udf(
			(gender: String) => {
				gender match {
					case "男" => "M"
					case "女" => "F"
					case _ => "M"
				}
			}
		)
		
		// TODO: 调用UDF函数,进行转换
		val etlDF: DataFrame = kuduDF.select(
			$"id", $"name", //
			$"age".plus(1).as("age"),
			gender_to_udf($"gender").as("gender")
		)
		//etlDF.printSchema()
		//etlDF.show(10, truncate = false)
		
		// TODO: 保存数据到Kudu表
		etlDF.write
			.mode(SaveMode.Append)
			.format("kudu")
			.option("kudu.table", "kudu_itcast_users")
			.option("kudu.master", "node2.itcast.cn:7051")
			.option("kudu.operation", "upsert")
			.save()
			
		// 应用结束,关闭资源
		spark.stop()
	}
	
}

ocal[2]")
.config(“spark.sql.shuffle.partitions”, “2”)
.getOrCreate()
import spark.implicits._

	// TODO: 2. 从Kudu表加载数据
	val kuduDF: DataFrame = spark.read
		.format("kudu")
		.option("kudu.table", "kudu_itcast_users")
		.option("kudu.master", "node2.itcast.cn:7051")
		.load()
	//kuduDF.printSchema()
	//kuduDF.show(10, truncate = false)
	
	/*
		+----+--------+---+------+
		|id  |name    |age|gender|
		+----+--------+---+------+
		|1001|zhangsan|23 |男    |  -> M
		|1002|lisi    |22 |男    |
		|1004|zhaoliu2|33 |男    |
		|1003|xiaohong|24 |女    |  -> F
		+----+--------+---+------+
	 */
	// 自定义UDF函数,转换gender性别
	val gender_to_udf: UserDefinedFunction = udf(
		(gender: String) => {
			gender match {
				case "男" => "M"
				case "女" => "F"
				case _ => "M"
			}
		}
	)
	
	// TODO: 调用UDF函数,进行转换
	val etlDF: DataFrame = kuduDF.select(
		$"id", $"name", //
		$"age".plus(1).as("age"),
		gender_to_udf($"gender").as("gender")
	)
	//etlDF.printSchema()
	//etlDF.show(10, truncate = false)
	
	// TODO: 保存数据到Kudu表
	etlDF.write
		.mode(SaveMode.Append)
		.format("kudu")
		.option("kudu.table", "kudu_itcast_users")
		.option("kudu.master", "node2.itcast.cn:7051")
		.option("kudu.operation", "upsert")
		.save()
		
	// 应用结束,关闭资源
	spark.stop()
}

}


标签:Logistics,img,Day06,Note,Kudu,kudu,spark,数据,id
来源: https://blog.csdn.net/xianyu120/article/details/117229109

本站声明: 1. iCode9 技术分享网(下文简称本站)提供的所有内容,仅供技术学习、探讨和分享;
2. 关于本站的所有留言、评论、转载及引用,纯属内容发起人的个人观点,与本站观点和立场无关;
3. 关于本站的所有言论和文字,纯属内容发起人的个人观点,与本站观点和立场无关;
4. 本站文章均是网友提供,不完全保证技术分享内容的完整性、准确性、时效性、风险性和版权归属;如您发现该文章侵犯了您的权益,可联系我们第一时间进行删除;
5. 本站为非盈利性的个人网站,所有内容不会用来进行牟利,也不会利用任何形式的广告来间接获益,纯粹是为了广大技术爱好者提供技术内容和技术思想的分享性交流网站。

专注分享技术,共同学习,共同进步。侵权联系[81616952@qq.com]

Copyright (C)ICode9.com, All Rights Reserved.

ICode9版权所有