IoTDB 与 Apache FlinkFlink-IoTDB - 图1 (opens new window) 的集成. 此模块包含了 iotdb sink,允许 flink job 将时序数据写入IoTDB。

IoTDBSink

使用 IoTDBSink ,您需要定义一个 IoTDBOptions 和一个 IoTSerializationSchema 实例。 IoTDBSink 默认每次发送一个数据,可以通过调用 withBatchSize(int) 进行调整。

示例

该示例演示了如下从一个 Flink job 中发送数据到 IoTDB server 的场景:

  • 一个模拟的 Source SensorSource 每秒钟产生一个数据点。

  • Flink使用 IoTDBSink 消费产生的数据并写入 IoTDB 。

    1. import org.apache.iotdb.tsfile.file.metadata.enums.CompressionType;
    2. import org.apache.iotdb.tsfile.file.metadata.enums.TSDataType;
    3. import org.apache.iotdb.tsfile.file.metadata.enums.TSEncoding;
    4. import com.google.common.collect.Lists;
    5. import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
    6. import org.apache.flink.streaming.api.functions.source.SourceFunction;
    7. import java.security.SecureRandom;
    8. import java.util.HashMap;
    9. import java.util.Map;
    10. import java.util.Random;
    11. public class FlinkIoTDBSink {
    12. public static void main(String[] args) throws Exception {
    13. // run the flink job on local mini cluster
    14. StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
    15. IoTDBOptions options = new IoTDBOptions();
    16. options.setHost("127.0.0.1");
    17. options.setPort(6667);
    18. options.setUser("root");
    19. options.setPassword("root");
    20. options.setStorageGroup("root.sg");
    21. // If the server enables auto_create_schema, then we do not need to register all timeseries
    22. // here.
    23. options.setTimeseriesOptionList(
    24. Lists.newArrayList(
    25. new IoTDBOptions.TimeseriesOption(
    26. "root.sg.d1.s1", TSDataType.DOUBLE, TSEncoding.GORILLA, CompressionType.SNAPPY)));
    27. IoTSerializationSchema serializationSchema = new DefaultIoTSerializationSchema();
    28. IoTDBSink ioTDBSink =
    29. new IoTDBSink(options, serializationSchema)
    30. // enable batching
    31. .withBatchSize(10)
    32. // how many connectons to the server will be created for each parallelism
    33. .withSessionPoolSize(3);
    34. env.addSource(new SensorSource())
    35. .name("sensor-source")
    36. .setParallelism(1)
    37. .addSink(ioTDBSink)
    38. .name("iotdb-sink");
    39. env.execute("iotdb-flink-example");
    40. }
    41. private static class SensorSource implements SourceFunction<Map<String, String>> {
    42. boolean running = true;
    43. Random random = new SecureRandom();
    44. @Override
    45. public void run(SourceContext context) throws Exception {
    46. while (running) {
    47. Map<String, String> tuple = new HashMap();
    48. tuple.put("device", "root.sg.d1");
    49. tuple.put("timestamp", String.valueOf(System.currentTimeMillis()));
    50. tuple.put("measurements", "s1");
    51. tuple.put("types", "DOUBLE");
    52. tuple.put("values", String.valueOf(random.nextDouble()));
    53. context.collect(tuple);
    54. Thread.sleep(1000);
    55. }
    56. }
    57. @Override
    58. public void cancel() {
    59. running = false;
    60. }
    61. }
    62. }

运行方法

  • 启动 IoTDB server
  • 运行 org.apache.iotdb.flink.FlinkIoTDBSink.java 将 Flink job 运行在本地的集群上。