Streaming Writes
Spark Streaming
You can write Hudi tables using spark’s structured streaming.
- Scala
- Python
// spark-shell
// prepare to stream write to new table
import org.apache.spark.sql.streaming.Trigger
val streamingTableName = "hudi_trips_cow_streaming"
val baseStreamingPath = "file:///tmp/hudi_trips_cow_streaming"
val checkpointLocation = "file:///tmp/checkpoints/hudi_trips_cow_streaming"
// create streaming df
val df = spark.readStream.
format("hudi").
load(basePath)
// write stream to new hudi table
df.writeStream.format("hudi").
options(getQuickstartWriteConfigs).
option("hoodie.datasource.write.precombine.field", "ts").
option("hoodie.datasource.write.recordkey.field", "uuid").
option("hoodie.datasource.write.partitionpath.field", "partitionpath").
option("hoodie.table.name", streamingTableName).
outputMode("append").
option("path", baseStreamingPath).
option("checkpointLocation", checkpointLocation).
trigger(Trigger.Once()).
start()
# pyspark
# prepare to stream write to new table
streamingTableName = "hudi_trips_cow_streaming"
baseStreamingPath = "file:///tmp/hudi_trips_cow_streaming"
checkpointLocation = "file:///tmp/checkpoints/hudi_trips_cow_streaming"
hudi_streaming_options = {
'hoodie.table.name': streamingTableName,
'hoodie.datasource.write.recordkey.field': 'uuid',
'hoodie.datasource.write.partitionpath.field': 'partitionpath',
'hoodie.datasource.write.table.name': streamingTableName,
'hoodie.datasource.write.operation': 'upsert',
'hoodie.datasource.write.precombine.field': 'ts',
'hoodie.upsert.shuffle.parallelism': 2,
'hoodie.insert.shuffle.parallelism': 2
}
# create streaming df
df = spark.readStream
.format("hudi")
.load(basePath)
# write stream to new hudi table
df.writeStream.format("hudi")
.options(**hudi_streaming_options)
.outputMode("append")
.option("path", baseStreamingPath)
.option("checkpointLocation", checkpointLocation)
.trigger(once=True)
.start()