5.3 Message Format

Messages (aka Records) are always written in batches. The technical term for a batch of messages is a record batch, and a record batch contains one or more records. In the degenerate case, we could have a record batch containing a single record. Record batches and records have their own headers. The format of each is described below.

5.3.1 Record Batch

The following is the on-disk format of a RecordBatch.

  1. baseOffset: int64
  2. batchLength: int32
  3. partitionLeaderEpoch: int32
  4. magic: int8 (current magic value is 2)
  5. crc: int32
  6. attributes: int16
  7. bit 0~2:
  8. 0: no compression
  9. 1: gzip
  10. 2: snappy
  11. 3: lz4
  12. 4: zstd
  13. bit 3: timestampType
  14. bit 4: isTransactional (0 means not transactional)
  15. bit 5: isControlBatch (0 means not a control batch)
  16. bit 6~15: unused
  17. lastOffsetDelta: int32
  18. firstTimestamp: int64
  19. maxTimestamp: int64
  20. producerId: int64
  21. producerEpoch: int16
  22. baseSequence: int32
  23. records: [Record]

Note that when compression is enabled, the compressed record data is serialized directly following the count of the number of records.

The CRC covers the data from the attributes to the end of the batch (i.e. all the bytes that follow the CRC). It is located after the magic byte, which means that clients must parse the magic byte before deciding how to interpret the bytes between the batch length and the magic byte. The partition leader epoch field is not included in the CRC computation to avoid the need to recompute the CRC when this field is assigned for every batch that is received by the broker. The CRC-32C (Castagnoli) polynomial is used for the computation.

On compaction: unlike the older message formats, magic v2 and above preserves the first and last offset/sequence numbers from the original batch when the log is cleaned. This is required in order to be able to restore the producer’s state when the log is reloaded. If we did not retain the last sequence number, for example, then after a partition leader failure, the producer might see an OutOfSequence error. The base sequence number must be preserved for duplicate checking (the broker checks incoming Produce requests for duplicates by verifying that the first and last sequence numbers of the incoming batch match the last from that producer). As a result, it is possible to have empty batches in the log when all the records in the batch are cleaned but batch is still retained in order to preserve a producer’s last sequence number. One oddity here is that the firstTimestamp field is not preserved during compaction, so it will change if the first record in the batch is compacted away.

5.3.1.1 Control Batches

A control batch contains a single record called the control record. Control records should not be passed on to applications. Instead, they are used by consumers to filter out aborted transactional messages.

The key of a control record conforms to the following schema:

  1. version: int16 (current version is 0)
  2. type: int16 (0 indicates an abort marker, 1 indicates a commit)

The schema for the value of a control record is dependent on the type. The value is opaque to clients.

5.3.2 Record

Record level headers were introduced in Kafka 0.11.0. The on-disk format of a record with Headers is delineated below.

  1. length: varint
  2. attributes: int8
  3. bit 0~7: unused
  4. timestampDelta: varint
  5. offsetDelta: varint
  6. keyLength: varint
  7. key: byte[]
  8. valueLen: varint
  9. value: byte[]
  10. Headers => [Header]
5.3.2.1 Record Header
  1. headerKeyLength: varint
  2. headerKey: String
  3. headerValueLength: varint
  4. Value: byte[]

We use the same varint encoding as Protobuf. More information on the latter can be found here. The count of headers in a record is also encoded as a varint.

5.3.3 Old Message Format

Prior to Kafka 0.11, messages were transferred and stored in message sets. In a message set, each message has its own metadata. Note that although message sets are represented as an array, they are not preceded by an int32 array size like other array elements in the protocol.

Message Set:

  1. MessageSet (Version: 0) => [offset message_size message]
  2. offset => INT64
  3. message_size => INT32
  4. message => crc magic_byte attributes key value
  5. crc => INT32
  6. magic_byte => INT8
  7. attributes => INT8
  8. bit 0~2:
  9. 0: no compression
  10. 1: gzip
  11. 2: snappy
  12. bit 3~7: unused
  13. key => BYTES
  14. value => BYTES
  1. MessageSet (Version: 1) => [offset message_size message]
  2. offset => INT64
  3. message_size => INT32
  4. message => crc magic_byte attributes timestamp key value
  5. crc => INT32
  6. magic_byte => INT8
  7. attributes => INT8
  8. bit 0~2:
  9. 0: no compression
  10. 1: gzip
  11. 2: snappy
  12. 3: lz4
  13. bit 3: timestampType
  14. 0: create time
  15. 1: log append time
  16. bit 4~7: unused
  17. timestamp => INT64
  18. key => BYTES
  19. value => BYTES

In versions prior to Kafka 0.10, the only supported message format version (which is indicated in the magic value) was 0. Message format version 1 was introduced with timestamp support in version 0.10.

  • Similarly to version 2 above, the lowest bits of attributes represent the compression type.
  • In version 1, the producer should always set the timestamp type bit to 0. If the topic is configured to use log append time, (through either broker level config log.message.timestamp.type = LogAppendTime or topic level config message.timestamp.type = LogAppendTime), the broker will overwrite the timestamp type and the timestamp in the message set.
  • The highest bits of attributes must be set to 0.

In message format versions 0 and 1 Kafka supports recursive messages to enable compression. In this case the message’s attributes must be set to indicate one of the compression types and the value field will contain a message set compressed with that type. We often refer to the nested messages as “inner messages” and the wrapping message as the “outer message.” Note that the key should be null for the outer message and its offset will be the offset of the last inner message.

When receiving recursive version 0 messages, the broker decompresses them and each inner message is assigned an offset individually. In version 1, to avoid server side re-compression, only the wrapper message will be assigned an offset. The inner messages will have relative offsets. The absolute offset can be computed using the offset from the outer message, which corresponds to the offset assigned to the last inner message.

The crc field contains the CRC32 (and not CRC-32C) of the subsequent message bytes (i.e. from magic byte to the value).