ICode9

精准搜索请尝试: 精确搜索
首页 > 其他分享> 文章详细

Apache Hudi简介

2022-04-25 16:02:37  阅读:464  来源: 互联网

标签:files Hudi log 简介 file Apache table data


Hudi: Uber Engineering’s Incremental Processing Framework on Apache Hadoop

With the evolution of storage formats like Apache Parquet and Apache ORC and query engines like Presto and Apache Impala,

the Hadoop ecosystem has the potential to become a general-purpose, unified serving layer for workloads that can tolerate latencies of a few minutes.
In order to achieve this, however, it requires efficient and low latency data ingestion and data preparation in the Hadoop Distributed File System (HDFS).

To address this at Uber, we built Hudi (pronounced as “hoodie”), an incremental processing framework to power all business critical data pipelines at low latency and high efficiency.
In fact, we’ve recently open sourced it for others to use and build on.
But before diving into Hudi, let’s take a step back and discuss why it’s a good idea to think about Hadoop as the unified serving layer.

上面提出Unified serving layer

如果要理解这个,首先要看看Lambda架构,可以看到有两个serving layer,batch和streaming是分开的

所以问题就是Hadoop对于Batch全量是可以很好解决的,但是对于增量数据,increment,没有好的方案

以前用Streaming来解决这个问题,但负担太重,成本太高

有没有成本低些的增量方案,对于延时可以放宽到分钟级别

这就是Hudi的动机,提供一套低成本的增量方案,在Streaming和batch之间进一步trade off,但凡trade off方案,就是要取舍,谈不上根本的创新

Motivation

Lambda architecture is a common data processing architecture that proposes double compute with streaming and batch layer.
Once every few hours, a batch process is kicked off to compute the accurate business state and the batch update is bulk loaded into the serving layer.
Meanwhile, a stream processing layer computes and serves the same state to circumvent the above multi-hour latency.
However, this state is only an approximate one until it is overridden by the more accurate batch computed state.
Since the states are slightly different, there needs to be either different serving layers for batch and stream, coalesced(合并) in an abstraction on top,
or a rather complex serving system (like Druid) which performs reasonably well for record-level updates and batch bulk loads.

 

Kappa是linkedin提出来的,本质就是流批一体
就像Google提出的,没有必要批,流式可以满足所有的需求,因为批是流的特殊形态
所以Kappa是基于kafka的,对于Streaming是天然支持的,
对于Batch,通过回放来支持

这里的问题,底下也说了,

- 流是针对,record-level更新,对于ad-hoc查询不是很友好
查询变了,就要重新回放,没有索引,无法点查询

- 存储数据量有限,太多了也回放不过来,当然可以通过checkpoint或state优化,但有限效果

所以我个人觉得Kappa这个架构比较牵强,实际上真正这么做的也不多

Questioning the need for a separate batch layer, Kappa architecture argues that a stream processing engine could be a general-purpose solution for computations.
In a generic sense, all computations can be described as operators producing a tuple stream and consumers iterating over multiple input tuple streams (i.e. Volcano Iterator model).
This functionality would enable the streaming layer to handle the reprocessing of business states by replaying computation with increased parallelism and resources.
With systems that can efficiently checkpoint and store large amounts of streaming state, the business state in the streaming layer is no longer an approximation;
this model has gained some traction with many ingest pipelines.
Still, even though the batch layer is eliminated in this model, the problem of having two different serving layers remains.

Many true stream processing systems today operate at record level, so speed serving systems should be optimized for record-level updates.
Typically, these systems cannot be optimized for analytical scans as well, unless the system has either a large chunk of its data in-memory (like Memsql) or aggressive indexes (like ElasticSearch).
These speed-serving systems sacrifice scalability and cost for optimized ingest and scan performance.
For this reason, data retention in these serving systems is typically limited, meaning that they can last 30 to 90 days or store up to a few TBs of data.
Analytics on older historical data is often redirected to query engines on HDFS where data latency is not an issue.

 

 

回过头来谈tradeoff,
如果可以容忍10分钟左右的延迟,那么是不是有一种更为轻量的,快速的,ingest和prepare方案

对于当前的Hadoop,只有在增加如下需求就好了,大家并不是真正需要streaming,

- 更新,并且是事务更新,保证一致性
- 支持ad-hoc查询,用列存,支持AP场景
- 支持增量的propagate和chain

意思是什么?在Hadoop上,加点TP,加点AP,再带上Streaming,在各个场景中都可以用,但是都不极致

This fundamental tradeoff between data ingest latency, scan performance, and compute resources and operational complexity is unavoidable.
But for workloads that can tolerate latencies of about 10 minutes, there is no need for a separate “speed” serving layer if there is a faster way to ingest and prepare data in HDFS.
This unifies the serving layer and reduces the overall complexity and resource usage significantly.

However, for HDFS to become the unified serving layer, it needs to not only store a log of changesets (a system of record), but also support compacted, de-duplicated business states partitioned by a meaningful business metric. The following features are required for this type of unified serving layer:

  • Ability to quickly apply mutations to large HDFS datasets
  • Data storage options that are optimized for analytical scans (columnar file formats)
  • Ability to chain and propagate updates efficiently to modeled datasets

Compacted business state usually can’t avoid mutations, even if the business partition field is the time in which the event occurred. Ingestion can still result in updates to many older partitions because of late-arriving data and the difference between event and processing times. Even if the partition key is the processing time, there may still be a need for updates because of the demand to wipe out data for audit compliance or security reasons.

Hudi就是用来满足上面这些的需求的增量framework

图就变成下面,主要变化的点,

- unified serving,不用区分实时的和历史的

- 由record update,batch update,变成mini batch update

看到Stream系统还是在的,因为总要有人把数据写入,但会Streaming层会轻量很多,只是单纯的ingestion层

Enter Hudi, an incremental framework that supports the above requirements outlined in our previous section.
In short, Hudi (Hadoop Upsert Delete and Incremental) is an analytical, scan-optimized data storage abstraction,
which enables applying mutations to data in HDFS on the order of few minutes and chaining of incremental processing.

Hudi datasets integrate with the current Hadoop ecosystem (including Apache Hive, Apache Parquet, Presto, and Apache Spark) through a custom InputFormat, making the framework seamless for the end user.

 

 

下面的图很有意思,将各个场景的需求进行分类,

想表达的其实就是,真正的实时Streaming场景很少,大部分场景可以用Incremental来替代

The DataFlow model characterizes data pipelines based on their latency and completeness guarantees.
Figure 4, below, demonstrates how pipelines at Uber Engineering are distributed across this spectrum and what styles of processing are typically applied for each:

For the few use cases truly needing ~1 minute latencies and dashboards with simple business metrics, we rely on record-level stream processing.
For traditional batch use cases like machine learning and experiment effectiveness analysis, we rely on batch processing that excels at heavier computations.
For use cases where complex joins or significant data crunching is needed at near real-time latencies, we rely on Hudi and its incremental processing primitives to obtain the best of both worlds.
To learn more about the use cases supported by Hudi, you can check out our documentation on Github.

 

Hudi的存储结构,依次
basePath,partition,(files, commit), record

- 分区下包含文件,文件是由field唯一标识的,具有相同field的多个文件,是update关系,代表一份数据的多个版本

- record,具有唯一的record key,并且和文件field的对应关系,在第一次写入时确定,不会更改。

Hudi organizes a dataset into a partitioned directory structure under a basepath, similar to a traditional Hive table.
The dataset is broken up into partitions, which are directories containing data files for that partition.
Each partition is uniquely identified by its partitionpath relative to the basepath.
Within each partition, records are distributed into multiple data files.
Each data file is identified by both an unique fileId and the commit that produced the file.
In the case of updates, multiple data files can share the same fileId written at different commits.

Each record is uniquely identified by a record key and mapped to a fileId. 
This mapping between record key and fileId is permanent once the first version of a record has been written to a file.
In short, the fileId identifies a group of files that contain all versions of a group of records.

Hudi的存储分成以下几种

- Metadata,变更记录,通过meta可以知道在每个snapshot包含哪些数据
   变更分为,
   - commit,就是写入,由递增的时间戳标识
   - cleans,删除
   - Compactions,数据重整,比如行列转换

- Index,记录record key和file field的关系,快速找到对于的文件,实现可以是bloom filter,或Hbase

- Data,数据文件,这里可以支持多种格式,ROF,Parquet,或WOF,Avro

Hudi storage consists of three distinct parts:

  1. Metadata: Hudi maintains the metadata of all activity performed on the dataset as a timeline, which enables instantaneous views of the dataset.
    This is stored under a metadata directory in the basepath. Below we’ve outlined the types of actions in the timeline:
    • Commits: A single commit captures information about an atomic write of a batch of records into a dataset.
      Commits are identified by a monotonically increasing timestamp, denoting the start of the write operation.
    • Cleans: Background activity that gets rid of older versions of files in the dataset that will no longer be used in a running query.
    • Compactions: Background activity to reconcile differential data structures within Hudi (e.g. moving updates from row-based log files to columnar formats).
  2. Index: Hudi maintains an index to quickly map an incoming record key to a fileId if the record key is already present.
    Index implementation is pluggable and the following are the options currently available:
    • Bloom filter stored in each data file footer: The preferred default option, since there is no dependency on any external system. Data and index are always consistent with one another.
    • Apache HBase: Efficient lookup for a small batch of keys. This option is likely to shave off a few seconds during index tagging.
  3. Data: Hudi stores all ingested data in two different storage formats.
    The actual formats used are pluggable, but fundamentally require the following characteristics:
    • Scan-optimized columnar storage format (ROFormat). Default is Apache Parquet.
    • Write-optimized row-based storage format (WOFormat). Default is Apache Avro.

存储上一个重要的优化,Compaction,对于所有的append-only的存储,Compaction都是必须的优化

Hudi storage is optimized for HDFS usage patterns.
Compaction is the critical operation to convert data from a write-optimized format to a scan-optimized format.
Since the fundamental unit of parallelism for a compaction is rewriting a single fileId, Hudi ensures all data files are written out as HDFS block-sized files to balance compaction parallelism,
query scan parallelism, and the total number of files in HDFS.
Compaction is also pluggable, which can be extended to stitch older, less frequently updated data files to further reduce the total number of files.

 

写入流程,

Hudi当前是一个Spark的library,所以可以通过Spark Streaming来调用写入,当然如果对于延时要求不高,也可以选择工作流平台,airflow

Hudi is a Spark library that is intended to be run as a streaming ingest job, and ingests data as mini-batches (typically on the order of one to two minutes).
However, depending on latency requirements and resource negotiation time, the ingest jobs can also be run as scheduled tasks using Apache Oozie or Apache Airflow.

 

完整的写入流程,

首先,load所有files的BloomFilter,这样他才能知道每个record是否已经存在,并在哪个文件中;把每个record打上update或insert的标签

接着,按partition对insert record进行分组,生成一个新的field,并写入数据直到HDFS的block size上限,没写完就生成新的file继续写

然后,对于updates,找到相应的file去append

最后,在meta timeline中记录这次commit

这其中还描述了Compaction的过程,定期schedule,通过zk来互斥,有优先级

The following is the write path for a Hudi ingestion with the default configuration:

  1. Hudi loads the Bloom filter index from all parquet files in the involved partitions (meaning, partitions spread from the input batch)
    and tags the record as either an update or insert by mapping the incoming keys to existing files for updates.
    The join here could skew on input batch size, partition spread, or number of files in a partition.
    It is handled automatically by doing a range partitioning on a joined key and sub-partitioned to avoid the notorious 2GB limit for a remote shuffle block in Spark.
  2. Hudi groups inserts per partition, assigns a new fileId, and appends to the corresponding log file until the log file reaches the HDFS block size.
    Once the block size is reached, Hudi creates another fileId and repeats this process for all inserts in that partition.
    1. A time-limited compaction process is kicked off by a scheduler every few minutes,
      which generates a prioritized list of compactions and compacts all the avro files for a fileId with the current parquet file to create the next version of that parquet file.
    2. Compaction runs asynchronously, locking down specific log versions being compacted and writing new updates to that fileId into a new log version. Locks are obtained in Zookeeper.
    3. Compactions are prioritized based on the size of the log data being compacted, and are pluggable with a compaction strategy.
      On every compaction iteration, the files with the largest amount of logs are compacted first, while small log files are compacted last,
      since the cost of re-writing the parquet file is not amortized on the number of updates to the file.
  3. Hudi appends updates for a fileId to its corresponding log file if one exists or creates a log file if one doesn’t exist.
  4. If the ingest job succeeds, a commit is recorded in the Hudi meta timeline,
    which atomically renames an inflight file to a commit file and writes out details about partitions and the fileId version created.

Hudi会尽量让文件大小接近block size

As discussed before, Hudi strives to align file size with the underlying block size.
Depending on the efficiency of columnar compression and the volume of data in a partition to compact, compaction can still create small parquet files.
This is eventually autocorrected in the next iterations of ingestion, since inserts to a partition are packed as updates to existing small files.
Eventually, file sizes will grow to reach the underlying block size on compaction.

 

查询路径

The commit meta timeline enables both a read-optimized view and a realtime view of the same data in HDFS;
these views let the client choose between data latency and query execution time.
Hudi provides these views with a custom InputFormat, and includes a Hive registration module which registers both these views as Hive metastore tables.
Both of these input formats understand fileId and commit time, and filters the files to pick only the most recently committed files.
Then, Hudi generates splits on those data files to run the query plan. InputFormat details are outlined below:

  • HoodieReadOptimizedInputFormat: Provides a scan-optimized view which filters out all the log files and just picks the latest versions of compacted parquet files.
  • HoodieRealtimeInputFormat: Provides a more real-time view which, in addition to picking the latest versions of compacted parquet files, also provides a RecordReader to merge the log files with their corresponding parquet files during a scan.

Both InputFormats extend MapredParquetInputFormat and VectorizedParquetRecordReader so all optimizations done for reading parquet files still apply.
Presto and SparkSQL work out of the box on the Hive metastore tables, provided the required hoodie-hadoop-mr library is in classpath.

 

As previously stated, modeled tables need to be processed and served in HDFS for HDFS to become the unified serving layer.
Building low-latency modeled tables requires the ability to chain incremental processing of HDFS datasets.
Since Hudi maintains metadata about commit times and file versions created for every commit, incremental changeset can be pulled from a Hudi-specific dataset within a start timestamp and an end timestamp.

This process works much in the same way as a normal query, except that the specific file versions that fall within the query time range are picked instead of just the latest version,
and an additional predicate about the commit time is pushed onto the file scan to retrieve only the records that changed in the requested duration.
The duration for which changesets can be obtained is determined by how many versions of data files can be left uncleaned.

This enables stream-to-stream joins with watermarks and stream-to-dataset joins to compute and upsert modeled tables in HDFS.

 

 

Apache Hudi - The Data Lake Platform

https://hudi.apache.org/blog/2021/07/21/streaming-data-lake-platform/

Hudi在hadoop生态中的位置,就像Kafka在流式计算中的位置

Hudi completes the missing pieces of the puzzle by providing streaming optimized lake storage, much like how Kafka/Pulsar enable efficient storage for event streaming. 

Hudi支持事务和可更改,并且和原先哪些具有丰富的事务特性的数仓比,他具备serverless的强扩展性

But first, we needed to tackle the basics - transactions and mutability - on the data lake.
In many ways, Apache Hudi pioneered the transactional data lake movement as we know it today.
Specifically, during a time when more special-purpose systems were being born, Hudi introduced a server-less, transaction layer,
which worked over the general-purpose Hadoop FileSystem abstraction on Cloud Stores/HDFS.
This model helped Hudi to scale writers/readers to 1000s of cores on day one, compared to warehouses which offer a richer set of transactional guarantees
but are often bottlenecked by the 10s of servers that need to handle them. 

 

Hudi Stack

打星的,in progress

虚线的,in future

 

 

后续一个个介绍这些组件

 

Lake Storage

Hudi支持Hadoop FileSystem API,所以所有支持这个API的存储都可以兼容。
另外Hudi会尽量使用append,如果存储支持的话,避免文件数量爆炸。

Hudi interacts with lake storage using the Hadoop FileSystem API, which makes it compatible with all of its implementations ranging from HDFS to Cloud Stores to even in-memory filesystems like Alluxio/Ignite.
Hudi internally implements its own wrapper filesystem on top to provide additional storage optimizations (e.g: file sizing), performance optimizations (e.g: buffering), and metrics.
Uniquely, Hudi takes full advantage of append support, for storage schemes that support it, like HDFS.
This helps Hudi deliver streaming writes without causing an explosion in file counts/table metadata.
Unfortunately, most cloud/object storages do not offer append capability today (except maybe Azure).
In the future, we plan to leverage the lower-level APIs of major cloud object stores, to provide similar controls over file counts at streaming ingest latencies.

 

File Format

一个base file加上一组delta files,称为file slice。

file的格式是可以pluggable的,Base file支持Parquet和HFile,Delta file常使用avro。

一次compact就会形成一个新的file slice。

Hudi is designed around the notion of base file and delta log files that store updates/deltas to a given base file (called a file slice).
Their formats are pluggable, with Parquet (columnar access) and HFile (indexed access) being the supported base file formats today.
The delta logs encode data in Avro (row oriented) format for speedier logging (just like Kafka topics for e.g).
Going forward, we plan to inline any base file format into log blocks in the coming releases, providing columnar access to delta logs depending on block sizes.
Future plans also include Orc base/log file formats, unstructured data formats (free form json, images),
and even tiered storage layers in event-streaming systems/OLAP engines/warehouses, work with their native file formats.

Zooming one level up, Hudi's unique file layout scheme encodes all changes to a given base file,
as a sequence of blocks (data blocks, delete blocks, rollback blocks) that are merged in order to derive newer base files.
In essence, this makes up a self contained redo log that the lets us implement interesting features on top.
For e.g, most of today's data privacy enforcement happens by masking data read off the lake storage on-the-fly,
invoking hashing/encryption algorithms over and over on the same set of records and incurring significant compute overhead/cost.
Users would be able to keep multiple pre-masked/encrypted copies of the same key in the logs and hand out the correct one based on a policy, avoiding all the overhead.

 

Table Format

Table在文件的基础上,增加了Schema和Metadata。
Schema,使用Avro schemas实现,并且schema-on-write,即写的时候会检测schema的valid。

files会按照table/partition进行分组,所有的更新都以delta文件的形式放入特定的group,这样merge的时候不用全局merge,Hive ACID就是需要全局merge

metadata,就是timeline记录所有的actions,这里说到,merge-on-read,写的时候append only,读的时候需要merge

metadata是HFile格式,有索引,便于检索,不用扫描整个table

The term “table format” is new and still means many things to many people.
Drawing an analogy to file formats, a table format simply consists of : the file layout of the table, table’s schema and metadata tracking changes to the table.
Hudi is not a table format, it implements one internally.
Hudi uses Avro schemas to store, manage and evolve a table’s schema.
Currently, Hudi enforces schema-on-write, which although stricter than schema-on-read, is adopted widely 
in the stream processing world to ensure pipelines don't break from non backwards compatible changes.

Hudi consciously lays out files within a table/partition into groups and maintains a mapping between an incoming record’s key to an existing file group.
All updates are recorded into delta log files specific to a given file group and this design ensures low merge overhead compared to approaches like Hive ACID,
which have to merge all delta records against all base files to satisfy queries.
For e.g, with uuid keys (used very widely) all base files are very likely to overlap with all delta logs, rendering any range based pruning useless.
Much like state stores, Hudi’s design anticipates fast key based upserts/deletes and only requires merging delta logs within each file group.
This design choice also lets Hudi provide more capabilities for writing/querying as we will explain below.

 

 

The timeline is the source-of-truth event log for all Hudi’s table metadata, stored under the .hoodie folder, that provides an ordered log of all actions performed on the table.
Events are retained on the timeline up to a configured interval of time/activity.
Each file group is also designed as it’s own self-contained log, which means that even if an action that affected a file group is archived from the timeline,
the right state of the records in each file group can be reconstructed by simply locally applying the delta logs to the base file.
This design bounds the metadata size, proportional to how often the table is being written to/operated on, independent of how large the entire table is.
This is a critical design element needs for supporting frequent writes/commits to tables.

Lastly, new events on the timeline are then consumed and reflected onto an internal metadata table,
implemented as another merge-on-read table offering low write amplification.
Hudi is able to absorb quick/rapid changes to table’s metadata, unlike table formats designed for slow-moving data.
Additionally, the metadata table uses the HFile base file format, which provides indexed lookups of keys avoiding the need for reading the entire metadata table to satisfy metadata reads.
It currently stores all the physical file paths that are part of the table, to avoid expensive cloud file listings.

A key challenge faced by all the table formats out there today,
is the need for expiring snapshots/controlling retention for time travel queries such that it does not interfere with query planning/performance.
In the future, we plan to build an indexed timeline in Hudi, which can span the entire history of the table, supporting a time travel look back window of several months/years.

 

Indexes

除了通过metadata进行file listing,基于column统计。
Hudi还提供了Bloomfilter或HBase两种索引,来匹配record和相应的file group。

Indexes help databases plan better queries, that reduce the overall amount of I/O and deliver faster response times.
Table metadata about file listings and column statistics are often enough for lake query engines to generate optimized, engine specific query plans quickly.
This is however not sufficient for Hudi to realize fast upserts.
Hudi already supports different key based indexing schemes to quickly map incoming record keys into the file group they reside in.
For this purpose, Hudi exposes a pluggable indexing layer to the writer implementations,
with built-in support for range pruning (when keys are ordered and largely arrive in order) using interval trees and bloom filters (e.g: for uuid based keys where ordering is of very little help).
Hudi also implements a HBase backed external index which is much more performant although more expensive to operate.
Hudi also consciously exploits the partitioning scheme of the table to implement global and non-global indexing schemes.
Users can choose to enforce key constraints only within a partition,
in return for O(num_affected_partitions) upsert performance as opposed to O(total_partitions) in the global indexing scenarios.
We refer you to this blog, that goes over indexing in detail.
Ultimately, Hudi's writer path ensures the index is always kept in sync with the timeline and data, which is cumbersome and error prone to implement on top of a table format by hand.

 

In the future, we intend to add additional forms of indexing as new partitions on the metadata table.
Let’s discuss the role each one has to play briefly.
Query engines typically rely on partitioning to cut down the number of files read for a given query.
In database terms, a Hive partition is nothing but a coarse range index, that maps a set of columns to a list of files.
Table formats born in the cloud like Iceberg/Delta Lake, have built-in tracking of column ranges per file in a single flat file (json/avro),
that helps avoid planning costs for large/poorly sized tables.
This need has been largely reduced for Hudi tables thus far, given Hudi automatically enforces file sizes which help bound time taken to read out stats from parquet footers
for e.g. However, with the advent of features like clustering, there is a need for writing smaller files first and then reclustering in a query optimized way.
We plan to add indexed column ranges, that can scale to lots of small files and support faster mutations . See RFC-27 to track the design process and get involved.

 

While Hudi already supports external indexes for random write workloads, we would like to support point-lookup-ish queries right on top of lake storage,
which helps avoid the overhead of an additional database for many classes of data applications.
We also anticipate that uuid/key based joins will be sped up a lot, by leveraging record level indexing schemes, we build out for fast upsert performance.
We also plan to move our tracking of bloom filters out of the file footers and into its own partition on the metadata table.
Ultimately, we look to exposing all of this to the queries as well in the coming releases.

 

Concurrency Control

Concurrency control defines how different writers/readers coordinate access to the table.
Hudi ensures atomic writes, by way of publishing commits atomically to the timeline, stamped with an instant time that denotes the time at which the action is deemed to have occurred.
Unlike general purpose file version control,
Hudi draws clear distinction between writer processes (that issue user’s upserts/deletes),
table services (that write data/metadata to optimize/perform bookkeeping) and readers (that execute queries and read data).
Hudi provides snapshot isolation between all three types of processes, meaning they all operate on a consistent snapshot of the table.
Hudi provides optimistic concurrency control (OCC) between writers, while providing lock-free, non-blocking MVCC 
based concurrency control between writers and table-services and between different table services.

Projects that solely rely on OCC deal with competing operations, by either implementing a lock or relying on atomic renames.
Such approaches are optimistic that real contention never happens and resort to failing one of the writer operations if conflicts occur,
which can cause significant resource wastage or operational overhead.
Imagine a scenario of two writer processes :
an ingest writer job producing new data every 30 minutes and a deletion writer job that is enforcing GDPR taking 2 hours to issue deletes.
If there were to overlap on the same files (very likely to happen in real situations with random deletes),
the deletion job is almost guaranteed to starve and fail to commit each time, wasting tons of cluster resources.
Hudi takes a very different approach that we believe is more apt for lake transactions, which are typically long-running.
For e.g async compaction that can keep deleting records in the background without blocking the ingest job.
This is implemented via a file level, log based concurrency control protocol which orders actions based on their start instant times on the timeline.

We are hard at work, improving our OCC based implementation around early detection of conflicts for concurrent writers and terminate early without burning up CPU resources.
We are also working on adding fully log based, non-blocking concurrency control between writers,
where writers proceed to write deltas and conflicts are resolved later in some deterministic timeline order - again much like how stream processing programs are written.
This is possible only due to Hudi’s unique design that sequences actions into an ordered event log and the transaction handling code is aware of the relationship/interdependence of actions to each other. 

 

Writers

Hudi tables can be used as sinks for Spark/Flink pipelines and the Hudi writing path provides several enhanced capabilities over file writing done by vanilla parquet/avro sinks.
Hudi classifies write operations carefully into
incremental (insertupsertdelete)
and batch/bulk operations (insert_overwriteinsert_overwrite_tabledelete_partitionbulk_insert)
and provides relevant functionality for each operation in a performant and cohesive way.
Both upsert and delete operations automatically handle merging of records with the same key in the input stream (say, a CDC stream obtained from upstream table) and then lookup the index,
finally invoke a bin packing algorithm to pack data into files, while respecting a pre-configured target file size.
An insert operation on the other hand, is intelligent enough to avoid the precombining and index lookup, while retaining the benefits of the rest of the pipeline.
Similarly, bulk_insert operation provides several sort modes for controlling initial file sizes and file counts, when importing data from an external table to Hudi.
The other batch write operations provide MVCC based implementations of typical overwrite semantics used in batch data pipelines,
while retaining all the transactional and incremental processing capabilities,
making it seamless to switch between incremental pipelines for regular runs and batch pipelines for backfilling/dropping older partitions.
The write pipeline also contains lower layers optimizations around handling large merges by spilling to rocksDB or an external spillable map, multi-threaded/concurrent I/O to improve write performance.

Keys are first class citizens inside Hudi and the pre-combining/index lookups done before upsert/deletes ensure a key is unique across partitions or within partitions, as desired.
In contrast with other approaches where this is left to data engineer to co-ordinate using MERGE INTO statements, this approach ensures quality data especially for critical use-cases.
Hudi also ships with several built-in key generators that can parse all common date/timestamps, handle malformed data with an extensible framework for defining custom key generators.
Keys are also materialized with the records using the _hoodie_record_key meta column, which makes it possible to change the key fields and perform repairs on older data with incorrect keys for e.g.
Finally, Hudi provides a HoodieRecordPayload interface is very similar to processor APIs in Flink or Kafka Streams, and allows for expressing arbitrary merge conditions, between the base and delta log records.
This allows users to express partial merges (e.g log only updated columns to the delta log for efficiency) and avoid reading all the base records before every merge.
Routinely, we find users leverage such custom merge logic during replaying/backfilling older data onto a table, while ensuring newer updates are not overwritten causing the table's snapshot to go back in time.
This is achieved by simply using the HoodieDefaultPayload where latest value for a given key is picked based a configured precombine field value in the data.

Hudi writers add metadata to each record, that codify the commit time and a sequence number for each record within that commit (comparable to a Kafka offset),
which make it possible to derive record level change streams.
Hudi also provides users the ability to specify event time fields in incoming data streams and track them in the timeline.
Mapping these to stream processing concepts, Hudi contains both arrival and event time for records for each commit, that can help us build good watermarks that inform complex incremental processing pipelines.
In the near future, we are looking to add new metadata columns, that encode the source operation (insert, update, delete) for each record,
before we embark on this grand goal of full end-end incremental ETL pipelines.
All said, we realized many users may simply want to use Hudi as an efficient write layer that supports transactions, fast updates/deletes.
We are looking into adding support for virtual keys and making the meta columns optional, to lower storage overhead, while still making rest of Hudi's capabilities (metadata table, table services, ..) available.

 

Readers

Hudi provides snapshot isolation between writers and readers and allows for any table snapshot to be queries consistently from all major lake query engines
(Spark, Hive, Flink, Presto, Trino, Impala) and even cloud warehouses like Redshift.
In fact, we would love to bring Hudi tables as external tables with BigQuery/Snowflake as well, once they also embrace the lake table formats more natively.
Our design philosophy around query performance has been to make Hudi as lightweight as possible whenever only base columnar files are read (CoW snapshot, MOR read-optimized queries),
employing the engine specific vectorized readers in Presto, Trino, Spark for e.g to be employed.
This model is far more scalable than maintaining our own readers and users to benefit from engine specific optimizations.
For e.g PrestoTrino all have their own data/metadata caches.
Whenever, Hudi has to merge base and log files for a query, Hudi takes control and employs several mechanisms (spillable maps, lazy reading) to improve merge performance,
while also providing a read-optimized query on the data that trades off data freshness for query performance.
In the near future, we are investing deeply into improving MoR snapshot query performance in many ways such as inlining parquet data, special handling of overwrite payloads/merges.

True to its design goals, Hudi provides some very powerful incremental querying capabilities that tied together the meta fields added during writing and the file group based storage layout.
While table formats that merely track files, are only able to provide information about files that changed during each snapshot or commits,
Hudi generates the exact set of records that changed given a point in the timeline, due to tracking of record level event and arrival times.
Further more, this design allows large commits to be consumed in smaller chunks by an incremental query, fully decoupling the writing and incremental querying of data.
Time travel is merely implemented as an incremental query that starts and stops at an older portion of the timeline.
Since Hudi ensures that a key is atomically mapped to a single file group at any point in time,
it makes it possible to support full CDC capabilities on Hudi tables, such as providing all possible values for a given record since time t, CDC streams with both before and after images.
All of these functionalities can be built local to each file group, given each file group is a self-contained log.
Much of our future work in this area will be around bringing such a powerful set of debezium like capabilities to life in the coming months.

 

Table Services

介绍各种Table层面的服务,Archival,Cleaner,Compaction,Clustering,bootstrap等

What defines and sustains a project’s value over years are its fundamental design principles and the subtle trade offs.
Databases often consist of several internal components, working in tandem to deliver efficiency, performance and great operability to its users.
True to intent to act as state store for incremental data pipelines,
we designed Hudi with built-in table services and self-managing runtime that can orchestrate/trigger these services to optimize everything internally.
In fact, if we compare rocksDB (a very popular stream processing state-store) and Hudi’s components, the similarities become obvious.

There are several built-in table services, all with the goal of ensuring performant table storage layout and metadata management,
which are automatically invoked either synchronously after each write operation, or asynchronously as a separate background job.
Furthermore, Spark (and Flink) streaming writers can run in continuous mode, and invoke table services asynchronously sharing the underlying executors intelligently with writers.
Archival service ensures that the timeline holds sufficient history for inter service co-ordination (e.g compactions wait for other compactions to complete on the same file group), incremental queries.
Once events expire from the timeline, the archival service cleans up any side-effects from lake storage (e.g. rolling back of failing concurrent transactions).
Hudi's transaction management implementation allows all of these services to be idempotent and thus resilient to failure via just simple retries. 
Cleaner service works off the timeline incrementally (eating our own incremental design dog food),
removing file slices that are past the configured retention period for incremental queries, while also allowing sufficient time for long running batch jobs (e.g Hive ETLs) to finish running.
Compaction service comes with built-in strategies (date partitioning based, I/O bounded),
that merges a base file with a set of delta log files to produce new base file, all while allowing writes to happen concurrently to the file group.
This is only possible due to Hudi's grouping of files into groups and support for flexible log merging,
and unlocks non-blocking execution of deletes while concurrent updates are being issues to the same set of records. 
Clustering service functions similar to what users find in BigQuery or Snowflake,
where users can group records that are often queried together by sort keys or control file sizes by coalescing smaller base files into larger ones.
Clustering is fully aware of other actions on the timeline such as cleaning, compaction,
and it helps Hudi implement intelligent optimizations like avoiding compaction on file groups that are already being clustered, to save on I/O.
Hudi also performs rollback of partial writes and cleans up any uncommitted data from lake storage, by use of marker files that track any files created as a part of write operations.
Finally, the bootstrap service performs one time zero copy migration of plain parquet tables to Hudi, while allowing both pipelines to operate in parallel, for data validation purposes.
Cleaner service is once again aware of these bootstrapped base files and can optionally clean them up, to ensure use-cases like GDPR compliance are met.

We are always looking for ways to improve and enhance our table services in meaningful ways.
In the coming releases, we are working towards a much more scalable model of cleaning up partial writes,
by consolidating marker file creation using our timeline metaserver, which avoids expensive full table scans to seek out and remove uncommitted files.
We also have various proposals to add more clustering schemes, unlock clustering with concurrent updates using fully log based concurrency control.

 

Data Services

基于Table Service之上,提供针对场景的工具,这里主要是DeltaStreamer

As noted at the start, we wanted to make Hudi immediately usable for common end-end use-cases and thus invested deeply into a set of data services,
that provide functionality that is data/workload specific, sitting on top of the table services, writers/readers directly.
Foremost in that list, is the Hudi DeltaStreamer utility,
which has been an extremely popular choice for painlessly building a data lake out of Kafka streams and files landing in different formats on top of lake storage.
Over time, we have also built out sources that cover all major systems like a JDBC source for RDBMS/other warehouses, Hive source and even incrementally pulling data from other Hudi tables.
The utility supports automatic checkpoint management tracking source checkpoints as a part of target Hudi table metadata, with support for backfills/one-off runs.
DeltaStreamer also integrates with major schema registries such as Confluent's and also provides checkpoint translation from other popular mechanisms like Kafka connect.
It also supports de-duplication of data, multi-level configuration management system, built in transformers that take arbitrary SQL or coerce CDC log changes into writable forms,
that combined with other aforementioned features can be used for deploying production grade incremental pipelines.
Finally, just like the Spark/Flink streaming writers, DeltaStreamer is able to run in a continuous mode, with automatic management of table services.
Hudi also provides several other tools for snapshotting and incrementally exporting Hudi tables, also importing/exporting/bootstrapping new tables into Hudi.
Hudi also provides commit notifications into Http endpoints or Kafka topics, about table commit activity,
which can be used for analytics or building data sensors in workflow managers like Airflow to trigger pipelines.

 

 

Going forward, we would love contributions to enhance our multi delta streamer utility,
which can ingest entire Kafka clusters in a single large Spark application, to be on par and hardened.
To further our progress towards end-end complex incremental pipelines, we plan to work towards enhancing the delta streamer utility
and its SQL transformers to be triggered by multiple source streams (as opposed to just the one today) and unlock materialized views at scale.
We would like to bring an array of useful transformers that perform masking or data monitoring, and extend support for egress of data off Hudi tables into other external sinks as well.
Finally, we would love to merge the FlinkStreamer and the DeltaStreamer utilities into one cohesive utility, that can be used across engines.
We are constantly improving existing sources (e.g support for parallelized listings of DFS sources) and adding new ones (e.g S3 event based DFS source)

 

Timeline Metaserver

Storing and serving table metadata right on the lake storage is scalable, but can be much less performant compared to RPCs against a scalable meta server.
Most cloud warehouses internally are built on a metadata layer that leverages an external database (e.g Snowflake uses foundationDB).
Hudi also provides a metadata server, called the “Timeline server”, which offers an alternative backing store for Hudi’s table metadata.
Currently, the timeline server runs embedded in the Hudi writer processes, serving file listings out of a local rocksDB store/Javalin REST API during the write process,
without needing to repeatedly list the cloud storage.
Given we have hardened this as the default option since our 0.6.0 release, we are considering standalone timeline server installations,
with support for horizontal scaling, database/table mappings, security and all the features necessary to turn it into a highly performant next generation lake metastore.

 

Lake Cache

There is a fundamental tradeoff today in data lakes between faster writing and great query performance.
Faster writing typically involves writing smaller files (and later clustering them) or logging deltas (and later merging on read).
While this provides good performance already, the pursuit of great query performance often warrants opening fewer number of files/objects on lake storage
and may be pre-materializing the merges between base and delta logs.
After all, most databases employ a buffer pool or block cache, to amortize the cost of accessing storage.
Hudi already contains several design elements that are conducive for building a caching tier (write-through or even just populated by an incremental query),
that will be multi-tenant and can cache pre-merged images of the latest file slices, consistent with the timeline.
Hudi timeline can be used to simply communicate caching policies, just like how we perform inter table service co-ordination.
Historically, caching has been done closer to the query engines or via intermediate in-memory file systems.
By placing a caching tier closer and more tightly integrated with a transactional lake storage like Hudi,
all query engines would be able to share and amortize the cost of the cache, while supporting updates/deletes as well.
We look forward to building a buffer pool for the lake that works across all major engines, with the contributions from the rest of the community.

 

标签:files,Hudi,log,简介,file,Apache,table,data
来源: https://www.cnblogs.com/fxjwind/p/16190583.html

本站声明: 1. iCode9 技术分享网(下文简称本站)提供的所有内容,仅供技术学习、探讨和分享;
2. 关于本站的所有留言、评论、转载及引用,纯属内容发起人的个人观点,与本站观点和立场无关;
3. 关于本站的所有言论和文字,纯属内容发起人的个人观点,与本站观点和立场无关;
4. 本站文章均是网友提供,不完全保证技术分享内容的完整性、准确性、时效性、风险性和版权归属;如您发现该文章侵犯了您的权益,可联系我们第一时间进行删除;
5. 本站为非盈利性的个人网站,所有内容不会用来进行牟利,也不会利用任何形式的广告来间接获益,纯粹是为了广大技术爱好者提供技术内容和技术思想的分享性交流网站。

专注分享技术,共同学习,共同进步。侵权联系[81616952@qq.com]

Copyright (C)ICode9.com, All Rights Reserved.

ICode9版权所有