Streaming Write Pipeline

This document discusses the new Streaming Write Pipeline feature in Ozone. It is implemented with the Ratis Streaming API. Note that the existing Ozone Write Pipeline is implemented with the Ratis Async API. We refer the new Streaming Write Pipeline as Write Pipeline V2 and the existing Async Write Pipeline as Write Pipeline V1.

The Streaming Write Pipeline V2 increases the performance by providing better network topology awareness and removing the performance bottlenecks in V1. The V2 implementation also avoids unnecessary buffer copying (by Netty zero copy) and has a better utilization of the CPUs and the disks in each datanode.

Configuration Properties

Set the following properties to the Ozone configuration file ozone-site.xml.

  • To enable the Streaming Write Pipeline feature, set the following property to true.
  1. <property>
  2. <name>dfs.container.ratis.datastream.enabled</name>
  3. <value>false</value>
  4. <tag>OZONE, CONTAINER, RATIS, DATASTREAM</tag>
  5. <description>It specifies whether to enable data stream of container.</description>
  6. </property>
  • Datanodes listen to the following port for the streaming traffic.
  1. <property>
  2. <name>dfs.container.ratis.datastream.port</name>
  3. <value>9855</value>
  4. <tag>OZONE, CONTAINER, RATIS, DATASTREAM</tag>
  5. <description>The datastream port number of container.</description>
  6. </property>
  • To use Streaming in FileSystem API, set the following property to true.
  1. <property>
  2. <name>ozone.fs.datastream.enabled</name>
  3. <value>false</value>
  4. <tag>OZONE, DATANODE</tag>
  5. <description>
  6. To enable/disable filesystem write via ratis streaming.
  7. </description>
  8. </property>

Client APIs

OzoneDataStreamOutput

The new OzoneDataStreamOutput class is very similar to the existing OzoneOutputStream class, except that OzoneDataStreamOutput uses ByteBuffer as a parameter in the write methods while OzoneOutputStream uses byte[]. The reason of using a ByteBuffer, instead of a byte[], is to support zero buffer copying. A typical write method is shown below:

  • OzoneDataStreamOutput
  1. public void write(ByteBuffer b, int off, int len) throws IOException;
  • OzoneOutputStream
  1. public void write(byte[] b, int off, int len) throws IOException;

OzoneBucket

The following new methods are added to OzoneBucket for creating keys using the Streaming Write Pipeline.

  • createStreamKey
  1. public OzoneDataStreamOutput createStreamKey(String key, long size)
  2. throws IOException;
  1. public OzoneDataStreamOutput createStreamKey(String key, long size,
  2. ReplicationConfig replicationConfig, Map<String, String> keyMetadata)
  3. throws IOException;
  • createMultipartStreamKey
  1. public OzoneDataStreamOutput createMultipartStreamKey(String key, long size,
  2. int partNumber, String uploadID) throws IOException;

Note that the methods above have the same parameter list as the existing createKey and createMultipartKey methods.

Below is an example to create a key from a local file using a memory-mapped buffer.

  1. // Create a memory-mapped buffer from a local file:
  2. final FileChannel channel = ... // local file channel
  3. final long length = ... // length of the data
  4. final ByteBuffer mapped = channel.map(FileChannel.MapMode.READ_ONLY, 0, length);
  5. // Create an OzoneDataStreamOutput
  6. final OzoneBucket bucket = ... // an Ozone bucket
  7. final String key = ... // the key name
  8. final OzoneDataStreamOutput out = bucket.createStreamKey(key, length);
  9. // Write the memory-mapped buffer to the key output
  10. out.write(mapped);
  11. // close
  12. out.close(); // In practice, use try-with-resource to close it.
  13. channel.close(); // In practice, use try-with-resource to close it.