Using Spark

Hudi Streamer

The HoodieStreamer utility (part of hudi-utilities-slim-bundle and hudi-utilities-bundle) provides ways to ingest from different sources such as DFS or Kafka, with the following capabilities.

  • Exactly once ingestion of new events from Kafka, incremental imports from Sqoop or output of HiveIncrementalPuller or files under a DFS folder
  • Support json, avro or a custom record types for the incoming data
  • Manage checkpoints, rollback & recovery
  • Leverage Avro schemas from DFS or Confluent schema registry.
  • Support for plugging in transformations

Using Spark - 图1Important

The following classes were renamed and relocated to org.apache.hudi.utilities.streamer package.

  • DeltastreamerMultiWriterCkptUpdateFunc is renamed to StreamerMultiWriterCkptUpdateFunc
  • DeltaSync is renamed to StreamSync
  • HoodieDeltaStreamer is renamed to HoodieStreamer
  • HoodieDeltaStreamerMetrics is renamed to HoodieStreamerMetrics
  • HoodieMultiTableDeltaStreamer is renamed to HoodieMultiTableStreamer

To maintain backward compatibility, the original classes are still present in the org.apache.hudi.utilities.deltastreamer package, but have been deprecated.

Options

Expand this to see HoodieStreamer’s “—help” output describing its capabilities in more details.

  1. [hoodie]$ spark-submit \
  2. --packages org.apache.hudi:hudi-utilities-slim-bundle_2.12:1.0.0,org.apache.hudi:hudi-spark3.5-bundle_2.12:1.0.0 \
  3. --class org.apache.hudi.utilities.streamer.HoodieStreamer `ls packaging/hudi-utilities-slim-bundle/target/hudi-utilities-slim-bundle-*.jar` --help
  4. Usage: <main class> [options]
  5. Options:
  6. --allow-commit-on-no-checkpoint-change
  7. allow commits even if checkpoint has not changed before and after fetch
  8. datafrom source. This might be useful in sources like SqlSource where
  9. there is not checkpoint. And is not recommended to enable in continuous
  10. mode.
  11. Default: false
  12. --base-file-format
  13. File format for the base files. PARQUET (or) HFILE
  14. Default: PARQUET
  15. --bootstrap-index-class
  16. subclass of BootstrapIndex
  17. Default: org.apache.hudi.common.bootstrap.index.HFileBootstrapIndex
  18. --bootstrap-overwrite
  19. Overwrite existing target table, default false
  20. Default: false
  21. --checkpoint
  22. Resume Hudi Streamer from this checkpoint.
  23. --cluster-scheduling-minshare
  24. Minshare for clustering as defined in
  25. https://spark.apache.org/docs/latest/job-scheduling.html
  26. Default: 0
  27. --cluster-scheduling-weight
  28. Scheduling weight for clustering as defined in
  29. https://spark.apache.org/docs/latest/job-scheduling.html
  30. Default: 1
  31. --commit-on-errors
  32. Commit even when some records failed to be written
  33. Default: false
  34. --compact-scheduling-minshare
  35. Minshare for compaction as defined in
  36. https://spark.apache.org/docs/latest/job-scheduling.html
  37. Default: 0
  38. --compact-scheduling-weight
  39. Scheduling weight for compaction as defined in
  40. https://spark.apache.org/docs/latest/job-scheduling.html
  41. Default: 1
  42. --config-hot-update-strategy-class
  43. Configuration hot update in continuous mode
  44. Default: <empty string>
  45. --continuous
  46. Hudi Streamer runs in continuous mode running source-fetch -> Transform
  47. -> Hudi Write in loop
  48. Default: false
  49. --delta-sync-scheduling-minshare
  50. Minshare for delta sync as defined in
  51. https://spark.apache.org/docs/latest/job-scheduling.html
  52. Default: 0
  53. --delta-sync-scheduling-weight
  54. Scheduling weight for delta sync as defined in
  55. https://spark.apache.org/docs/latest/job-scheduling.html
  56. Default: 1
  57. --disable-compaction
  58. Compaction is enabled for MoR table by default. This flag disables it
  59. Default: false
  60. --enable-hive-sync
  61. Enable syncing to hive
  62. Default: false
  63. --enable-sync
  64. Enable syncing meta
  65. Default: false
  66. --filter-dupes
  67. Should duplicate records from source be dropped/filtered out before
  68. insert/bulk-insert
  69. Default: false
  70. --force-empty-sync
  71. Force syncing meta even on empty commit
  72. Default: false
  73. --help, -h
  74. --hoodie-conf
  75. Any configuration that can be set in the properties file (using the CLI
  76. parameter "--props") can also be passed command line using this
  77. parameter. This can be repeated
  78. Default: []
  79. --ingestion-metrics-class
  80. Ingestion metrics class for reporting metrics during ingestion
  81. lifecycles.
  82. Default: org.apache.hudi.utilities.streamer.HoodieStreamerMetrics
  83. --initial-checkpoint-provider
  84. subclass of
  85. org.apache.hudi.utilities.checkpointing.InitialCheckpointProvider.
  86. Generate check point for Hudi Streamer for the first run. This field
  87. will override the checkpoint of last commit using the checkpoint field.
  88. Use this field only when switching source, for example, from DFS source
  89. to Kafka Source.
  90. --max-pending-clustering
  91. Maximum number of outstanding inflight/requested clustering. Delta Sync
  92. will not happen unlessoutstanding clustering is less than this number
  93. Default: 5
  94. --max-pending-compactions
  95. Maximum number of outstanding inflight/requested compactions. Delta Sync
  96. will not happen unlessoutstanding compactions is less than this number
  97. Default: 5
  98. --max-retry-count
  99. the max retry count if --retry-on-source-failures is enabled
  100. Default: 3
  101. --min-sync-interval-seconds
  102. the min sync interval of each sync in continuous mode
  103. Default: 0
  104. --op
  105. Takes one of these values : UPSERT (default), INSERT, BULK_INSERT,
  106. INSERT_OVERWRITE, INSERT_OVERWRITE_TABLE, DELETE_PARTITION
  107. Default: UPSERT
  108. Possible Values: [INSERT, INSERT_PREPPED, UPSERT, UPSERT_PREPPED, BULK_INSERT, BULK_INSERT_PREPPED, DELETE, DELETE_PREPPED, BOOTSTRAP, INSERT_OVERWRITE, CLUSTER, DELETE_PARTITION, INSERT_OVERWRITE_TABLE, COMPACT, INDEX, ALTER_SCHEMA, LOG_COMPACT, UNKNOWN]
  109. --payload-class
  110. subclass of HoodieRecordPayload, that works off a GenericRecord.
  111. Implement your own, if you want to do something other than overwriting
  112. existing value
  113. Default: org.apache.hudi.common.model.OverwriteWithLatestAvroPayload
  114. --post-write-termination-strategy-class
  115. Post writer termination strategy class to gracefully shutdown
  116. deltastreamer in continuous mode
  117. Default: <empty string>
  118. --props
  119. path to properties file on localfs or dfs, with configurations for
  120. hoodie client, schema provider, key generator and data source. For
  121. hoodie client props, sane defaults are used, but recommend use to
  122. provide basic things like metrics endpoints, hive configs etc. For
  123. sources, referto individual classes, for supported properties.
  124. Properties in this file can be overridden by "--hoodie-conf"
  125. Default: file:///Users/shiyanxu/src/test/resources/streamer-config/dfs-source.properties
  126. --retry-interval-seconds
  127. the retry interval for source failures if --retry-on-source-failures is
  128. enabled
  129. Default: 30
  130. --retry-last-pending-inline-clustering, -rc
  131. Retry last pending inline clustering plan before writing to sink.
  132. Default: false
  133. --retry-last-pending-inline-compaction
  134. Retry last pending inline compaction plan before writing to sink.
  135. Default: false
  136. --retry-on-source-failures
  137. Retry on any source failures
  138. Default: false
  139. --run-bootstrap
  140. Run bootstrap if bootstrap index is not found
  141. Default: false
  142. --schemaprovider-class
  143. subclass of org.apache.hudi.utilities.schema.SchemaProvider to attach
  144. schemas to input & target table data, built in options:
  145. org.apache.hudi.utilities.schema.FilebasedSchemaProvider.Source (See
  146. org.apache.hudi.utilities.sources.Source) implementation can implement
  147. their own SchemaProvider. For Sources that return Dataset<Row>, the
  148. schema is obtained implicitly. However, this CLI option allows
  149. overriding the schemaprovider returned by Source.
  150. --source-class
  151. Subclass of org.apache.hudi.utilities.sources to read data. Built-in
  152. options: org.apache.hudi.utilities.sources.{JsonDFSSource (default),
  153. AvroDFSSource, JsonKafkaSource, AvroKafkaSource, HiveIncrPullSource}
  154. Default: org.apache.hudi.utilities.sources.JsonDFSSource
  155. --source-limit
  156. Maximum amount of data to read from source. Default: No limit, e.g:
  157. DFS-Source => max bytes to read, Kafka-Source => max events to read
  158. Default: 9223372036854775807
  159. --source-ordering-field
  160. Field within source record to decide how to break ties between records
  161. with same key in input data. Default: 'ts' holding unix timestamp of
  162. record
  163. Default: ts
  164. --spark-master
  165. spark master to use, if not defined inherits from your environment
  166. taking into account Spark Configuration priority rules (e.g. not using
  167. spark-submit command).
  168. Default: <empty string>
  169. --sync-tool-classes
  170. Meta sync client tool, using comma to separate multi tools
  171. Default: org.apache.hudi.hive.HiveSyncTool
  172. * --table-type
  173. Type of table. COPY_ON_WRITE (or) MERGE_ON_READ
  174. * --target-base-path
  175. base path for the target hoodie table. (Will be created if did not exist
  176. first time around. If exists, expected to be a hoodie table)
  177. * --target-table
  178. name of the target table
  179. --transformer-class
  180. A subclass or a list of subclasses of
  181. org.apache.hudi.utilities.transform.Transformer. Allows transforming raw
  182. source Dataset to a target Dataset (conforming to target schema) before
  183. writing. Default : Not set. E.g. -
  184. org.apache.hudi.utilities.transform.SqlQueryBasedTransformer (which
  185. allows a SQL query templated to be passed as a transformation function).
  186. Pass a comma-separated list of subclass names to chain the
  187. transformations. If there are two or more transformers using the same
  188. config keys and expect different values for those keys, then transformer
  189. can include an identifier. E.g. -
  190. tr1:org.apache.hudi.utilities.transform.SqlQueryBasedTransformer. Here
  191. the identifier tr1 can be used along with property key like
  192. `hoodie.streamer.transformer.sql.tr1` to identify properties related to
  193. the transformer. So effective value for
  194. `hoodie.streamer.transformer.sql` is determined by key
  195. `hoodie.streamer.transformer.sql.tr1` for this transformer. If
  196. identifier is used, it should be specified for all the transformers.
  197. Further the order in which transformer is applied is determined by the
  198. occurrence of transformer irrespective of the identifier used for the
  199. transformer. For example: In the configured value below tr2:org.apache.hudi.utilities.transform.SqlQueryBasedTransformer,tr1:org.apache.hudi.utilities.transform.SqlQueryBasedTransformer
  200. , tr2 is applied before tr1 based on order of occurrence.

The tool takes a hierarchically composed property file and has pluggable interfaces for extracting data, key generation and providing schema. Sample configs for ingesting from kafka and dfs are provided under hudi-utilities/src/test/resources/streamer-config.

For e.g: once you have Confluent Kafka, Schema registry up & running, produce some test data using (impressions.avro provided by schema-registry repo)

  1. [confluent-5.0.0]$ bin/ksql-datagen schema=../impressions.avro format=avro topic=impressions key=impressionid

and then ingest it as follows.

  1. [hoodie]$ spark-submit \
  2. --packages org.apache.hudi:hudi-utilities-slim-bundle_2.12:1.0.0,org.apache.hudi:hudi-spark3.5-bundle_2.12:1.0.0 \
  3. --class org.apache.hudi.utilities.streamer.HoodieStreamer `ls packaging/hudi-utilities-slim-bundle/target/hudi-utilities-slim-bundle-*.jar` \
  4. --props file://${PWD}/hudi-utilities/src/test/resources/streamer-config/kafka-source.properties \
  5. --schemaprovider-class org.apache.hudi.utilities.schema.SchemaRegistryProvider \
  6. --source-class org.apache.hudi.utilities.sources.AvroKafkaSource \
  7. --source-ordering-field impresssiontime \
  8. --target-base-path file:\/\/\/tmp/hudi-streamer-op \
  9. --target-table uber.impressions \
  10. --op BULK_INSERT

In some cases, you may want to migrate your existing table into Hudi beforehand. Please refer to migration guide.

Using hudi-utilities-slim-bundle bundle jar

It is recommended to use hudi-utilities-slim-bundle, which should be used along with a Hudi Spark bundle corresponding the Spark version used to make utilities work with Spark, e.g., --packages org.apache.hudi:hudi-utilities-slim-bundle_2.12:1.0.0,org.apache.hudi:hudi-spark3.5-bundle_2.12:1.0.0.

Concurrency Control

Using optimistic concurrency control (OCC) via Hudi Streamer requires the configs below to the properties file that can be passed to the job.

  1. hoodie.write.concurrency.mode=optimistic_concurrency_control
  2. hoodie.write.lock.provider=<lock-provider-classname>
  3. hoodie.cleaner.policy.failed.writes=LAZY

As an example, adding the configs to kafka-source.properties file and passing them to Hudi Streamer will enable OCC. A Hudi Streamer job can then be triggered as follows:

  1. [hoodie]$ spark-submit \
  2. --packages org.apache.hudi:hudi-utilities-slim-bundle_2.12:1.0.0,org.apache.hudi:hudi-spark3.5-bundle_2.12:1.0.0 \
  3. --class org.apache.hudi.utilities.streamer.HoodieStreamer `ls packaging/hudi-utilities-slim-bundle/target/hudi-utilities-slim-bundle-*.jar` \
  4. --props file://${PWD}/hudi-utilities/src/test/resources/streamer-config/kafka-source.properties \
  5. --schemaprovider-class org.apache.hudi.utilities.schema.SchemaRegistryProvider \
  6. --source-class org.apache.hudi.utilities.sources.AvroKafkaSource \
  7. --source-ordering-field impresssiontime \
  8. --target-base-path file:///tmp/hudi-streamer-op \
  9. --target-table uber.impressions \
  10. --op BULK_INSERT

Read more in depth about concurrency control in the concurrency control concepts section

Checkpointing

HoodieStreamer uses checkpoints to keep track of what data has been read already so it can resume without needing to reprocess all data. When using a Kafka source, the checkpoint is the Kafka Offset When using a DFS source, the checkpoint is the ‘last modified’ timestamp of the latest file read. Checkpoints are saved in the .hoodie commit file as streamer.checkpoint.key.

If you need to change the checkpoints for reprocessing or replaying data you can use the following options:

  • --checkpoint will set streamer.checkpoint.reset_key in the commit file to overwrite the current checkpoint. Format of checkpoint depends on KAFKA_CHECKPOINT_TYPE. By default (for type string), checkpoint should be provided as: topicName,0:offset0,1:offset1,2:offset2. For type timestamp, checkpoint should be provided as long value of desired timestamp. For type single_offset, we assume that topic consists of a single partition, so checkpoint should be provided as long value of desired offset.
  • --source-limit will set a maximum amount of data to read from the source. For DFS sources, this is max # of bytes read. For Kafka, this is the max # of events to read.

Transformers

HoodieStreamer supports custom transformation on records before writing to storage. This is done by supplying implementation of org.apache.hudi.utilities.transform.Transformer via --transformer-class option.

SQL Query Transformer

You can pass a SQL Query to be executed during write.

  1. --transformer-class org.apache.hudi.utilities.transform.SqlQueryBasedTransformer
  2. --hoodie-conf hoodie.streamer.transformer.sql=SELECT a.col1, a.col3, a.col4 FROM <SRC> a

SQL File Transformer

You can specify a File with a SQL script to be executed during write. The SQL file is configured with this hoodie property: hoodie.streamer.transformer.sql.file

The query should reference the source as a table named “<SRC>”

The final sql statement result is used as the write payload.

Example Spark SQL Query:

  1. CACHE TABLE tmp_personal_trips AS
  2. SELECT * FROM <SRC> WHERE trip_type='personal_trips';
  3. SELECT * FROM tmp_personal_trips;

Flattening Transformer

This transformer can flatten nested objects. It flattens the nested fields in the incoming records by prefixing inner-fields with outer-field and _ in a nested fashion. Currently flattening of arrays is not supported.

An example schema may look something like the below where name is a nested field of StructType in the original source

  1. age as intColumn,address as stringColumn,name.first as name_first,name.last as name_last, name.middle as name_middle

Set the config as:

  1. --transformer-class org.apache.hudi.utilities.transform.FlatteningTransformer

Chained Transformer

If you wish to use multiple transformers together, you can use the Chained transformers to pass multiple to be executed sequentially.

Example below first flattens the incoming records and then does sql projection based on the query specified:

  1. --transformer-class org.apache.hudi.utilities.transform.FlatteningTransformer,org.apache.hudi.utilities.transform.SqlQueryBasedTransformer
  2. --hoodie-conf hoodie.streamer.transformer.sql=SELECT a.col1, a.col3, a.col4 FROM <SRC> a

AWS DMS Transformer

This transformer is specific for AWS DMS data. It adds Op field with value I if the field is not present.

Set the config as:

  1. --transformer-class org.apache.hudi.utilities.transform.AWSDmsTransformer

Custom Transformer Implementation

You can write your own custom transformer by extending this class

Schema Providers

By default, Spark will infer the schema of the source and use that inferred schema when writing to a table. If you need to explicitly define the schema you can use one of the following Schema Providers below.

Schema Registry Provider

You can obtain the latest schema from an online registry. You pass a URL to the registry and if needed, you can also pass userinfo and credentials in the url like: https://foo:bar@schemaregistry.org The credentials are then extracted and are set on the request as an Authorization Header.

When fetching schemas from a registry, you can specify both the source schema and the target schema separately.

ConfigDescriptionExample
hoodie.streamer.schemaprovider.registry.urlThe schema of the source you are reading fromhttps://foo:bar@schemaregistry.org
hoodie.streamer.schemaprovider.registry.targetUrlThe schema of the target you are writing tohttps://foo:bar@schemaregistry.org

The above configs are passed to Hudi Streamer spark-submit command like:

  1. --hoodie-conf hoodie.streamer.schemaprovider.registry.url=https://foo:bar@schemaregistry.org

There are other optional configs to work with schema registry provider such as SSL-store related configs, and supporting custom transformation of schema returned by schema registry, e.g., converting the original json schema to avro schema via org.apache.hudi.utilities.schema.converter.JsonToAvroSchemaConverter.

ConfigDescriptionExample
hoodie.streamer.schemaprovider.registry.schemaconverterThe class name of the custom schema converter to useorg.apache.hudi.utilities.schema.converter.JsonToAvroSchemaConverter
schema.registry.ssl.keystore.locationSSL key store location
schema.registry.ssl.keystore.passwordSSL key store password
schema.registry.ssl.truststore.locationSSL trust store location
schema.registry.ssl.truststore.passwordSSL trust store password
schema.registry.ssl.key.passwordSSL key password

JDBC Schema Provider

You can obtain the latest schema through a JDBC connection.

ConfigDescriptionExample
hoodie.streamer.schemaprovider.source.schema.jdbc.connection.urlThe JDBC URL to connect to. You can specify source specific connection properties in the URLjdbc:postgresql://localhost/test?user=fred&password=secret
hoodie.streamer.schemaprovider.source.schema.jdbc.driver.typeThe class name of the JDBC driver to use to connect to this URLorg.h2.Driver
hoodie.streamer.schemaprovider.source.schema.jdbc.usernameusername for the connectionfred
hoodie.streamer.schemaprovider.source.schema.jdbc.passwordpassword for the connectionsecret
hoodie.streamer.schemaprovider.source.schema.jdbc.dbtableThe table with the schema to referencetest_database.test1_table or test1_table
hoodie.streamer.schemaprovider.source.schema.jdbc.timeoutThe number of seconds the driver will wait for a Statement object to execute to the given number of seconds. Zero means there is no limit. In the write path, this option depends on how JDBC drivers implement the API setQueryTimeout, e.g., the h2 JDBC driver checks the timeout of each query instead of an entire JDBC batch. It defaults to 0.0
hoodie.streamer.schemaprovider.source.schema.jdbc.nullableIf true, all columns are nullabletrue

The above configs are passed to Hudi Streamer spark-submit command like: --hoodie-conf hoodie.streamer.jdbcbasedschemaprovider.connection.url=jdbc:postgresql://localhost/test?user=fred&password=secret

File Based Schema Provider

You can use a .avsc file to define your schema. You can then point to this file on DFS as a schema provider.

ConfigDescriptionExample
hoodie.streamer.schemaprovider.source.schema.fileThe schema of the source you are reading fromexample schema file
hoodie.streamer.schemaprovider.target.schema.fileThe schema of the target you are writing toexample schema file

Hive Schema Provider

You can use hive tables to fetch source and target schema.

ConfigDescription
hoodie.streamer.schemaprovider.source.schema.hive.databaseHive database from where source schema can be fetched
hoodie.streamer.schemaprovider.source.schema.hive.tableHive table from where source schema can be fetched
hoodie.streamer.schemaprovider.target.schema.hive.databaseHive database from where target schema can be fetched
hoodie.streamer.schemaprovider.target.schema.hive.tableHive table from where target schema can be fetched

Schema Provider with Post Processor

The SchemaProviderWithPostProcessor, will extract the schema from one of the previously mentioned Schema Providers and then will apply a post processor to change the schema before it is used. You can write your own post processor by extending this class: https://github.com/apache/hudi/blob/master/hudi-utilities/src/main/java/org/apache/hudi/utilities/schema/SchemaPostProcessor.java

Sources

Hoodie Streamer can read data from a wide variety of sources. The following are a list of supported sources:

Distributed File System (DFS)

See the storage configurations page to see some examples of DFS applications Hudi can read from. The following are the supported file formats Hudi can read/write with on DFS Sources. (Note: you can still use Spark/Flink readers to read from other formats and then write data as Hudi format.)

  • CSV
  • AVRO
  • JSON
  • PARQUET
  • ORC
  • HUDI

For DFS sources the following behaviors are expected:

  • For JSON DFS source, you always need to set a schema. If the target Hudi table follows the same schema as from the source file, you just need to set the source schema. If not, you need to set schemas for both source and target.
  • HoodieStreamer reads the files under the source base path (hoodie.streamer.source.dfs.root) directly, and it won’t use the partition paths under this base path as fields of the dataset. Detailed examples can be found here.

Kafka

Hudi can read directly from Kafka clusters. See more details on HoodieStreamer to learn how to setup streaming ingestion with exactly once semantics, checkpointing, and plugin transformations. The following formats are supported when reading data from Kafka:

  • AVRO: org.apache.hudi.utilities.sources.AvroKafkaSource
  • JSON: org.apache.hudi.utilities.sources.JsonKafkaSource
  • Proto: org.apache.hudi.utilities.sources.ProtoKafkaSource

Check out Kafka source config for more details.

Pulsar

HoodieStreamer also supports ingesting from Apache Pulsar via org.apache.hudi.utilities.sources.PulsarSource. Check out Pulsar source config for more details.

Cloud storage event sources

AWS S3 storage provides an event notification service which will post notifications when certain events happen in your S3 bucket: https://docs.aws.amazon.com/AmazonS3/latest/userguide/NotificationHowTo.html AWS will put these events in a Simple Queue Service (SQS). Apache Hudi provides S3EventsSource and S3EventsHoodieIncrSource that can read from SQS to trigger/processing of new or changed data as soon as it is available on S3. Check out S3 source configs for more details.

Similar to S3 event source, Google Cloud Storage (GCS) event source is also supported via GcsEventsSource and GcsEventsHoodieIncrSource. Check out GCS events source configs for more details.

AWS Setup
  1. Enable S3 Event Notifications https://docs.aws.amazon.com/AmazonS3/latest/userguide/NotificationHowTo.html
  2. Download the aws-java-sdk-sqs jar.
  3. Find the queue URL and Region to set these configurations:
    1. hoodie.streamer.s3.source.queue.url=https://sqs.us-west-2.amazonaws.com/queue/url
    2. hoodie.streamer.s3.source.queue.region=us-west-2
  4. Start the S3EventsSource and S3EventsHoodieIncrSource using the HoodieStreamer utility as shown in sample commands below:

Insert code sample from this blog: https://hudi.apache.org/blog/2021/08/23/s3-events-source/#configuration-and-setup

JDBC Source

Hudi can read from a JDBC source with a full fetch of a table, or Hudi can even read incrementally with checkpointing from a JDBC source.

ConfigDescriptionExample
hoodie.streamer.jdbc.urlURL of the JDBC connectionjdbc:postgresql://localhost/test
hoodie.streamer.jdbc.userUser to use for authentication of the JDBC connectionfred
hoodie.streamer.jdbc.passwordPassword to use for authentication of the JDBC connectionsecret
hoodie.streamer.jdbc.password.fileIf you prefer to use a password file for the connection
hoodie.streamer.jdbc.driver.classDriver class to use for the JDBC connection
hoodie.streamer.jdbc.table.namemy_table
hoodie.streamer.jdbc.table.incr.column.nameIf run in incremental mode, this field will be used to pull new data incrementally
hoodie.streamer.jdbc.incr.pullWill the JDBC connection perform an incremental pull?
hoodie.streamer.jdbc.extra.options.How you pass extra configurations that would normally by specified as spark.read.option()hoodie.streamer.jdbc.extra.options.fetchSize=100 hoodie.streamer.jdbc.extra.options.upperBound=1 hoodie.streamer.jdbc.extra.options.lowerBound=100
hoodie.streamer.jdbc.storage.levelUsed to control the persistence levelDefault = MEMORY_AND_DISK_SER
hoodie.streamer.jdbc.incr.fallback.to.full.fetchBoolean which if set true makes an incremental fetch fallback to a full fetch if there is any error in the incremental readFALSE

SQL Sources

SQL Source org.apache.hudi.utilities.sources.SqlSource reads from any table, used mainly for backfill jobs which will process specific partition dates. This won’t update the streamer.checkpoint.key to the processed commit, instead it will fetch the latest successful checkpoint key and set that value as this backfill commits checkpoint so that it won’t interrupt the regular incremental processing. To fetch and use the latest incremental checkpoint, you need to also set this hoodie_conf for Hudi Streamer jobs: hoodie.write.meta.key.prefixes = 'streamer.checkpoint.key'

Spark SQL should be configured using this hoodie config: hoodie.streamer.source.sql.sql.query = 'select * from source_table'

Using org.apache.hudi.utilities.sources.SqlFileBasedSource allows setting the SQL queries in a file to read from any table. SQL file path should be configured using this hoodie config: hoodie.streamer.source.sql.file = 'hdfs://xxx/source.sql'

Error Table

HoodieStreamer supports segregating error records into a separate table called “Error table” alongside with the target data table. This allows easy integration with dead-letter queues (DLQ). Error Table is supported with user-provided subclass of org.apache.hudi.utilities.streamer.BaseErrorTableWriter supplied via config hoodie.errortable.write.class. Check out more in org.apache.hudi.config.HoodieErrorTableConfig.

Termination Strategy

Users can configure a post-write termination strategy under continuous mode if need be. For instance, users can configure graceful shutdown if there is no new data from the configured source for 5 consecutive times. Here is the interface for the termination strategy.

  1. /**
  2. * Post write termination strategy for deltastreamer in continuous mode.
  3. */
  4. public interface PostWriteTerminationStrategy {
  5. /**
  6. * Returns whether HoodieStreamer needs to be shutdown.
  7. * @param scheduledCompactionInstantAndWriteStatuses optional pair of scheduled compaction instant and write statuses.
  8. * @return true if HoodieStreamer has to be shutdown. false otherwise.
  9. */
  10. boolean shouldShutdown(Option<Pair<Option<String>, JavaRDD<WriteStatus>>> scheduledCompactionInstantAndWriteStatuses);
  11. }

Also, this might help in bootstrapping a new table. Instead of doing one bulk load or bulk_insert leveraging a large cluster for a large input of data, one could start HoodieStreamer on the continuous mode and add a shutdown strategy to terminate, once all data has been bootstrapped. This way, each batch could be smaller and may not need a large cluster to bootstrap data. There is a concrete implementation provided out-of-the-box: NoNewDataTerminationStrategy. Users can feel free to implement their own strategy as they see fit.

Dynamic configuration updates

When Hoodie Streamer is running in continuous mode, the properties can be refreshed/updated before each sync calls. Interested users can implement org.apache.hudi.utilities.deltastreamer.ConfigurationHotUpdateStrategy to leverage this.

MultiTableStreamer

HoodieMultiTableStreamer, an extension of HoodieStreamer, facilitates the simultaneous ingestion of multiple tables into Hudi datasets. At present, it supports the sequential ingestion of tables and accommodates both COPY_ON_WRITE and MERGE_ON_READ storage types. The command line parameters for HoodieMultiTableStreamer largely mirror those of HoodieStreamer, with the notable difference being the necessity to supply table-specific configurations in separate files in a dedicated config folder. New command line options have been introduced to support this functionality:

  1. * --config-folder
  2. the path to the folder which contains all the table wise config files
  3. --base-path-prefix
  4. this is added to enable users to create all the hudi datasets for related tables under one path in FS. The datasets are then created under the path - <base_path_prefix>/<database>/<table_to_be_ingested>. However you can override the paths for every table by setting the property hoodie.streamer.ingestion.targetBasePath

The following properties are needed to be set properly to ingest data using HoodieMultiTableStreamer.

  1. hoodie.streamer.ingestion.tablesToBeIngested
  2. comma separated names of tables to be ingested in the format <database>.<table>, for example db1.table1,db1.table2
  3. hoodie.streamer.ingestion.targetBasePath
  4. if you wish to ingest a particular table in a separate path, you can mention that path here
  5. hoodie.streamer.ingestion.<database>.<table>.configFile
  6. path to the config file in dedicated config folder which contains table overridden properties for the particular table to be ingested.

Sample config files for table wise overridden properties can be found under hudi-utilities/src/test/resources/streamer-config. The command to run HoodieMultiTableStreamer is also similar to how you run HoodieStreamer.

  1. [hoodie]$ spark-submit \
  2. --packages org.apache.hudi:hudi-utilities-slim-bundle_2.12:1.0.0,org.apache.hudi:hudi-spark3.5-bundle_2.12:1.0.0 \
  3. --class org.apache.hudi.utilities.streamer.HoodieMultiTableStreamer `ls packaging/hudi-utilities-slim-bundle/target/hudi-utilities-slim-bundle-*.jar` \
  4. --props file://${PWD}/hudi-utilities/src/test/resources/streamer-config/kafka-source.properties \
  5. --config-folder file://tmp/hudi-ingestion-config \
  6. --schemaprovider-class org.apache.hudi.utilities.schema.SchemaRegistryProvider \
  7. --source-class org.apache.hudi.utilities.sources.AvroKafkaSource \
  8. --source-ordering-field impresssiontime \
  9. --base-path-prefix file:\/\/\/tmp/hudi-streamer-op \
  10. --target-table uber.impressions \
  11. --op BULK_INSERT

For detailed information on how to configure and use HoodieMultiTableStreamer, please refer blog section.