Hadoop FileSystem Connector
The BucketingSink
has been deprecated since Flink 1.9 and will be removed in subsequent releases.Please use the StreamingFileSink instead.
This connector provides a Sink that writes partitioned files to any filesystem supported byHadoop FileSystem. To use this connector, add thefollowing dependency to your project:
<dependency>
<groupId>org.apache.flink</groupId>
<artifactId>flink-connector-filesystem_2.11</artifactId>
<version>1.9.0</version>
</dependency>
Note that the streaming connectors are currently not part of the binarydistribution. Seeherefor information about how to package the program with the libraries forcluster execution.
Bucketing File Sink
The bucketing behaviour as well as the writing can be configured but we will get to that later.This is how you can create a bucketing sink which by default, sinks to rolling files that are split by time:
DataStream<String> input = ...;
input.addSink(new BucketingSink<String>("/base/path"));
val input: DataStream[String] = ...
input.addSink(new BucketingSink[String]("/base/path"))
The only required parameter is the base path where the buckets will bestored. The sink can be further configured by specifying a custom bucketer, writer and batch size.
By default the bucketing sink will split by the current system time when elements arrive and willuse the datetime pattern "yyyy-MM-dd—HH"
to name the buckets. This pattern is passed toDateTimeFormatter
with the current system time and JVM’s default timezone to form a bucket path.Users can also specify a timezone for the bucketer to format bucket path. A new bucket will be createdwhenever a new date is encountered. For example, if you have a pattern that contains minutes as thefinest granularity you will get a new bucket every minute. Each bucket is itself a directory thatcontains several part files: each parallel instance of the sink will create its own part file andwhen part files get too big the sink will also create a new part file next to the others. When abucket becomes inactive, the open part file will be flushed and closed. A bucket is regarded asinactive when it hasn’t been written to recently. By default, the sink checks for inactive bucketsevery minute, and closes any buckets which haven’t been written to for over a minute. Thisbehaviour can be configured with setInactiveBucketCheckInterval()
andsetInactiveBucketThreshold()
on a BucketingSink
.
You can also specify a custom bucketer by using setBucketer()
on a BucketingSink
. If desired,the bucketer can use a property of the element or tuple to determine the bucket directory.
The default writer is StringWriter
. This will call toString()
on the incoming elementsand write them to part files, separated by newline. To specify a custom writer use setWriter()
on a BucketingSink
. If you want to write Hadoop SequenceFiles you can use the providedSequenceFileWriter
which can also be configured to use compression.
There are two configuration options that specify when a part file should be closedand a new one started:
- By setting a batch size (The default part file size is 384 MB)
- By setting a batch roll over time interval (The default roll over interval is
Long.MAX_VALUE
)A new part file is started when either of these two conditions is satisfied.
Example:
DataStream<Tuple2<IntWritable,Text>> input = ...;
BucketingSink<Tuple2<IntWritable,Text>> sink = new BucketingSink<Tuple2<IntWritable,Text>>("/base/path");
sink.setBucketer(new DateTimeBucketer<>("yyyy-MM-dd--HHmm", ZoneId.of("America/Los_Angeles")));
sink.setWriter(new SequenceFileWriter<IntWritable, Text>());
sink.setBatchSize(1024 * 1024 * 400); // this is 400 MB,
sink.setBatchRolloverInterval(20 * 60 * 1000); // this is 20 mins
input.addSink(sink);
// the SequenceFileWriter only works with Flink Tuples
import org.apache.flink.api.java.tuple.Tuple2
val input: DataStream[Tuple2[A, B]] = ...
val sink = new BucketingSink[Tuple2[IntWritable, Text]]("/base/path")
sink.setBucketer(new DateTimeBucketer("yyyy-MM-dd--HHmm", ZoneId.of("America/Los_Angeles")))
sink.setWriter(new SequenceFileWriter[IntWritable, Text])
sink.setBatchSize(1024 * 1024 * 400) // this is 400 MB,
sink.setBatchRolloverInterval(20 * 60 * 1000); // this is 20 mins
input.addSink(sink)
This will create a sink that writes to bucket files that follow this schema:
/base/path/{date-time}/part-{parallel-task}-{count}
Where date-time
is the string that we get from the date/time format, parallel-task
is the indexof the parallel sink instance and count
is the running number of part files that were createdbecause of the batch size or batch roll over interval.
For in-depth information, please refer to the JavaDoc forBucketingSink.