ICode9

精准搜索请尝试: 精确搜索
首页 > 编程语言> 文章详细

Hudi Java Client 测试

2021-02-01 16:00:45  阅读:603  来源: 互联网

标签:Hudi hudi hoodieRecords client Java Client org apache import


Hudi 0.7.0

Hudi Jave Client 测试

<dependency>
    <groupId>org.apache.hudi</groupId>
    <artifactId>hudi-java-client</artifactId>
    <version>0.7.0</version>
</dependency>

将hudi 0.7 版本编译好的 hudi-example-0.7.0.jar 放入项目lib中

代码

package com.hjl.hudi;


import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.FileSystem;
import org.apache.hadoop.fs.Path;
import org.apache.hudi.client.HoodieJavaWriteClient;
import org.apache.hudi.client.WriteStatus;
import org.apache.hudi.client.common.HoodieJavaEngineContext;
import org.apache.hudi.common.fs.FSUtils;
import org.apache.hudi.common.model.*;
import org.apache.hudi.common.table.HoodieTableMetaClient;
import org.apache.hudi.config.HoodieCompactionConfig;
import org.apache.hudi.config.HoodieIndexConfig;
import org.apache.hudi.config.HoodieWriteConfig;
import org.apache.hudi.examples.common.HoodieExampleDataGenerator;
import org.apache.hudi.index.HoodieIndex;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

import java.util.ArrayList;
import java.util.List;
import java.util.stream.Collectors;

/**
 * @Description Hudi Java Client 测试
 * @Author jiale.he
 * @Date 2021-02-01 11:03 周一
 */
public class HudiClientTest {

    private static final Logger logger = LoggerFactory.getLogger(HudiClientTest.class);

    public static void main(String[] args) throws Exception {

        // String tablePath = "hdfs://localhost:8020/spark_hudi/huditable";
        String tablePath = "/Users/jiale.he/IdeaProjects/hudi-learn/src/main/resources/huditable";
        String tableName = "huditable";

        // 测试数据器
        HoodieExampleDataGenerator<HoodieAvroPayload> dataGen = new HoodieExampleDataGenerator<>();

        Configuration hadoopConf = new Configuration();

        // 初始化表
        Path path = new Path(tablePath);
        FileSystem fs = FSUtils.getFs(tablePath, hadoopConf);
        if (!fs.exists(path)) {
            // 检查路径是否存在
            // 初始化hudi table 创建hudi表的tablepath,写入初始化元数据信息
            HoodieTableMetaClient.initTableType(hadoopConf, tablePath, HoodieTableType.COPY_ON_WRITE, tableName, HoodieAvroPayload.class.getName());
        }

        // 创建write client conf
        HoodieWriteConfig hudiWriteConf = HoodieWriteConfig.newBuilder()
                // 数据schema
                .withSchema(HoodieExampleDataGenerator.TRIP_EXAMPLE_SCHEMA)
                // 数据插入更新并行度
                .withParallelism(2, 2)
                // 数据删除并行度
                .withDeleteParallelism(2)
                // hudi表索引类型,内存
                .withIndexConfig(HoodieIndexConfig.newBuilder().withIndexType(HoodieIndex.IndexType.INMEMORY).build())
                // 合并
                .withCompactionConfig(HoodieCompactionConfig.newBuilder().archiveCommitsWith(20, 30).build())
                .withPath(tablePath)
                .forTable(tableName)
                .build();

        // 获得hudi write client
        HoodieJavaWriteClient<HoodieAvroPayload> client = new HoodieJavaWriteClient<>(new HoodieJavaEngineContext(hadoopConf), hudiWriteConf);

        // 插入
        List<HoodieRecord<HoodieAvroPayload>> hoodieRecords = insert(dataGen, client);
        // 更新
        upsert(dataGen, client, hoodieRecords);
        // 删除
        delete(dataGen, client, hoodieRecords);

        client.close();
    }

    /**
     * 删除
     *
     * @param dataGen       数据生成器
     * @param client        client
     * @param hoodieRecords records
     */
    public static void delete(HoodieExampleDataGenerator dataGen, 
                              HoodieJavaWriteClient client, 
                              List<HoodieRecord<HoodieAvroPayload>> hoodieRecords) {
        String newCommitTime = client.startCommit();
        logger.info("Starting Commit: " + newCommitTime);
        int deleteNum = hoodieRecords.size() / 2;
        List<HoodieKey> deleteRecords = hoodieRecords
                .stream()
                .map(HoodieRecord::getKey)
                .limit(deleteNum)
                .collect(Collectors.toList());
        List<WriteStatus> deleteStatus = client.delete(deleteRecords, newCommitTime);
        client.commit(newCommitTime, deleteStatus);
    }

    /**
     * 更新
     *
     * @param dataGen       数据生成器
     * @param client        client
     * @param hoodieRecords records
     * @return records
     */
    public static List<HoodieRecord<HoodieAvroPayload>> upsert(HoodieExampleDataGenerator dataGen, 
                                                               HoodieJavaWriteClient client, 
                                                               List<HoodieRecord<HoodieAvroPayload>> hoodieRecords) {
        String newCommitTime = client.startCommit();
        logger.info("Starting Commit: " + newCommitTime);
        List<HoodieRecord<HoodieAvroPayload>> toBeUpdated = dataGen.generateUpdates(newCommitTime, 4);
        hoodieRecords.addAll(toBeUpdated);
        List<HoodieRecord<HoodieAvroPayload>> writeRecords = hoodieRecords
                .stream()
                .map(record -> new HoodieRecord<HoodieAvroPayload>(record))
                .collect(Collectors.toList());
        List<WriteStatus> upsert = client.upsert(writeRecords, newCommitTime);
        client.commit(newCommitTime, upsert);
        return hoodieRecords;
    }

    /**
     * 插入
     *
     * @param dataGen 数据生成器
     * @param client  client
     */
    public static List<HoodieRecord<HoodieAvroPayload>> insert(HoodieExampleDataGenerator dataGen, 
                                                               HoodieJavaWriteClient client) {
        // upsert
        // 开启提交
        String newCommitTime = client.startCommit();
        logger.info("Starting Commit: " + newCommitTime);

        // 生成数据
        List<HoodieRecord<HoodieAvroPayload>> records = dataGen.generateInserts(newCommitTime, 10);
        List<HoodieRecord<HoodieAvroPayload>> hoodieRecords = new ArrayList<>(records);
        List<HoodieRecord<HoodieAvroPayload>> writeRecords = hoodieRecords
                .stream()
                .map(record -> new HoodieRecord<HoodieAvroPayload>(record))
                .collect(Collectors.toList());
        // 获取upsertStatus
        List<WriteStatus> upsertStatus = client.upsert(writeRecords, newCommitTime);
        // 写入commit文件
        client.commit(newCommitTime, upsertStatus);

        return hoodieRecords;
    }


}

在这里插入图片描述

使用spark读取hudi数据

+-------------------+--------------------+------------------------------------+----------------------+-------------------------------------------------------------------+---+------------------------------------+--------------------+---------------------+-------------------+------------------+-------------------+-------------------+------------------+
|_hoodie_commit_time|_hoodie_commit_seqno|_hoodie_record_key                  |_hoodie_partition_path|_hoodie_file_name                                                  |ts |uuid                                |rider               |driver               |begin_lat          |begin_lon         |end_lat            |end_lon            |fare              |
+-------------------+--------------------+------------------------------------+----------------------+-------------------------------------------------------------------+---+------------------------------------+--------------------+---------------------+-------------------+------------------+-------------------+-------------------+------------------+
|20210201150109     |20210201150109_0_20 |d8d2eda9-47c6-4da7-84ad-fd012364ddb1|2020/01/03            |0eec6b6e-cff0-4eec-af70-47702394c031-0_0-0-0_20210201150109.parquet|0  |d8d2eda9-47c6-4da7-84ad-fd012364ddb1|rider-20210201150107|driver-20210201150107|0.33922164839486424|0.909372837469859 |0.9017656600243008 |0.8236411667430927 |2.0856583634078385|
|20210201150109     |20210201150109_0_12 |149c2df6-32b9-4114-aeef-c51802428e8b|2020/01/02            |8ef23f07-0ac9-4a9e-b9a8-57ab11e122e7-0_0-0-0_20210201150109.parquet|0  |149c2df6-32b9-4114-aeef-c51802428e8b|rider-20210201150107|driver-20210201150107|0.6662084366450246 |0.9065078444936647|0.7124299678100179 |0.05336723040266267|38.63372961020515 |
|20210201150109     |20210201150109_0_13 |746ec9d7-001d-434c-8b0f-538dd85efb42|2020/01/02            |8ef23f07-0ac9-4a9e-b9a8-57ab11e122e7-0_0-0-0_20210201150109.parquet|0  |746ec9d7-001d-434c-8b0f-538dd85efb42|rider-20210201150107|driver-20210201150107|0.4106290929046368 |0.964603455586492 |0.13957566957654388|0.45400191464227213|81.37564420028626 |
+-------------------+--------------------+------------------------------------+----------------------+-------------------------------------------------------------------+---+------------------------------------+--------------------+---------------------+-------------------+------------------+-------------------+-------------------+------------------+

标签:Hudi,hudi,hoodieRecords,client,Java,Client,org,apache,import
来源: https://blog.csdn.net/hjl18309163914/article/details/113521844

本站声明: 1. iCode9 技术分享网(下文简称本站)提供的所有内容,仅供技术学习、探讨和分享;
2. 关于本站的所有留言、评论、转载及引用,纯属内容发起人的个人观点,与本站观点和立场无关;
3. 关于本站的所有言论和文字,纯属内容发起人的个人观点,与本站观点和立场无关;
4. 本站文章均是网友提供,不完全保证技术分享内容的完整性、准确性、时效性、风险性和版权归属;如您发现该文章侵犯了您的权益,可联系我们第一时间进行删除;
5. 本站为非盈利性的个人网站,所有内容不会用来进行牟利,也不会利用任何形式的广告来间接获益,纯粹是为了广大技术爱好者提供技术内容和技术思想的分享性交流网站。

专注分享技术,共同学习,共同进步。侵权联系[81616952@qq.com]

Copyright (C)ICode9.com, All Rights Reserved.

ICode9版权所有