Record Payload

Background

One of the core features of Hudi is the ability to incrementally upsert data, deduplicate and merge records on the fly. Additionally, users can implement their custom logic to merge the input records with the record on storage. Record payload is an abstract representation of a Hudi record that allows the aforementioned capability. As we shall see below, Hudi provides out-of-box support for different payloads for different use cases. But, first let us understand how record payload is used in the Hudi upsert path.

upsert_path.png

Figure above shows the main stages that records go through while being written to the Hudi table. In the precombining stage, Hudi performs any deduplication based on the payload implementation and precombine key configured by the user. Further, on index lookup, Hudi identifies which records are being updated and the record payload implementation tells Hudi how to merge the incoming record with the existing record on storage.

Configs

Payload class can be specified using the below configs. For more advanced configs refer here

Spark based configs;

Config NameDefaultDescription
hoodie.datasource.write.payload.classorg.apache.hudi.common.model.OverwriteWithLatestAvroPayload (Optional)Payload class used. Override this, if you like to roll your own merge logic, when upserting/inserting. This will render any value set for PRECOMBINE_FIELD_OPT_VAL in-effective

Config Param: WRITE_PAYLOAD_CLASS_NAME

Flink based configs:

Config NameDefaultDescription
payload.classorg.apache.hudi.common.model.EventTimeAvroPayload (Optional)Payload class used. Override this, if you like to roll your own merge logic, when upserting/inserting. This will render any value set for the option in-effective

Config Param: PAYLOAD_CLASS_NAME

Existing Payloads

OverwriteWithLatestAvroPayload

  1. hoodie.datasource.write.payload.class=org.apache.hudi.common.model.OverwriteWithLatestAvroPayload

This is the default record payload implementation. It picks the record with the greatest value (determined by calling .compareTo() on the value of precombine key) to break ties and simply picks the latest record while merging. This gives latest-write-wins style semantics.

DefaultHoodieRecordPayload

  1. hoodie.datasource.write.payload.class=org.apache.hudi.common.model.DefaultHoodieRecordPayload

While OverwriteWithLatestAvroPayload precombines based on an ordering field and picks the latest record while merging, DefaultHoodieRecordPayload honors the ordering field for both precombinig and merging. Let’s understand the difference with an example:

Let’s say the ordering field is ts and record key is id and schema is:

  1. {
  2. [
  3. {"name":"id","type":"string"},
  4. {"name":"ts","type":"long"},
  5. {"name":"name","type":"string"},
  6. {"name":"price","type":"string"}
  7. ]
  8. }

Current record in storage:

  1. id ts name price
  2. 1 2 name_2 price_2

Incoming record:

  1. id ts name price
  2. 1 1 name_1 price_1

Result data after merging using OverwriteWithLatestAvroPayload (latest-write-wins):

  1. id ts name price
  2. 1 1 name_1 price_1

Result data after merging using DefaultHoodieRecordPayload (always honors ordering field):

  1. id ts name price
  2. 1 2 name_2 price_2

EventTimeAvroPayload

  1. hoodie.datasource.write.payload.class=org.apache.hudi.common.model.EventTimeAvroPayload

This is the default record payload for Flink based writing. Some use cases require merging records by event time and thus event time plays the role of an ordering field. This payload is particularly useful in the case of late-arriving data. For such use cases, users need to set the payload event time field configuration.

OverwriteNonDefaultsWithLatestAvroPayload

  1. hoodie.datasource.write.payload.class=org.apache.hudi.common.model.OverwriteNonDefaultsWithLatestAvroPayload

This payload is quite similar to OverwriteWithLatestAvroPayload with slight difference while merging records. For precombining, just like OverwriteWithLatestAvroPayload, it picks the latest record for a key, based on an ordering field. While merging, it overwrites the existing record on storage only for the specified fields that don’t equal default value for that field.

PartialUpdateAvroPayload

  1. hoodie.datasource.write.payload.class=org.apache.hudi.common.model.PartialUpdateAvroPayload

This payload supports partial update. Typically, once the merge step resolves which record to pick, then the record on storage is fully replaced by the resolved record. But, in some cases, the requirement is to update only certain fields and not replace the whole record. This is called partial update. PartialUpdateAvroPayload provides out-of-box support for such use cases. To illustrate the point, let us look at a simple example:

Let’s say the ordering field is ts and record key is id and schema is:

  1. {
  2. [
  3. {"name":"id","type":"string"},
  4. {"name":"ts","type":"long"},
  5. {"name":"name","type":"string"},
  6. {"name":"price","type":"string"}
  7. ]
  8. }

Current record in storage:

  1. id ts name price
  2. 1 2 name_1 null

Incoming record:

  1. id ts name price
  2. 1 1 null price_1

Result data after merging using PartialUpdateAvroPayload:

  1. id ts name price
  2. 1 2 name_1 price_1

Summary

In this document, we highlighted the role of record payload to support fast incremental ETL with updates and deletes. We also talked about some payload implementations readily provided by Hudi. There are quite a few other implementations and developers would be interested in looking at the hierarchy of HoodieRecordPayload interface. For example, MySqlDebeziumAvroPayload and PostgresDebeziumAvroPayload provides support for seamlessly applying changes captured via Debezium for MySQL and PostgresDB. AWSDmsAvroPayload provides support for applying changes captured via Amazon Database Migration Service onto S3.

Record payloads are tunable to suit many use cases. Please check out the configurations listed here. Moreover, if users want to implement their own custom merge logic, please check out this FAQ. In a separate document, we will talk about a new record merger API for optimized payload handling.