触发器

触发器提供了一种侦听序列数据变动的机制。配合用户自定义逻辑,可完成告警、数据清洗、数据转发等功能。

触发器基于 Java 反射机制实现。用户通过简单实现 Java 接口,即可实现数据侦听。IoTDB 允许用户动态装载、卸载触发器,在装载、卸载期间,无需启停服务器。

根据此文档,您将会很快学会触发器的编写与管理。

编写触发器

触发器依赖

触发器的逻辑需要您编写 Java 类进行实现。

在编写触发器逻辑时,需要使用到下面展示的依赖。如果您使用 Maven触发器 - 图1 (opens new window),则可以直接从 Maven 库触发器 - 图2 (opens new window) 中搜索到它们。

  1. <dependency>
  2. <groupId>org.apache.iotdb</groupId>
  3. <artifactId>iotdb-server</artifactId>
  4. <version>0.13.0-SNAPSHOT</version>
  5. <scope>provided</scope>
  6. </dependency>

请注意选择和目标服务器版本相同的依赖版本。

用户编程接口

编写一个触发器需要实现org.apache.iotdb.db.engine.trigger.api.Trigger类。

该类提供了两类编程接口:生命周期钩子数据变动侦听钩子。该类中所有的接口都不是必须实现的,当您不实现它们时,它们不会对流经的数据操作产生任何响应。您可以根据实际需要,只实现其中若干接口。

下面是所有可供用户进行实现的接口的说明。

生命周期钩子

接口定义描述
void onCreate(TriggerAttributes attributes) throws Exception当您使用CREATE TRIGGER语句注册触发器后,该钩子会被调用一次。在每一个实例的生命周期内,该钩子会且仅仅会被调用一次。该钩子主要有如下作用:1. 帮助用户解析 SQL 语句中的自定义属性(使用TriggerAttributes)。 2. 创建或申请资源,如建立外部链接、打开文件等。
void onDrop() throws Exception当您使用DROP TRIGGER语句删除触发器后,该钩子会被调用。在每一个实例的生命周期内,该钩子会且仅仅会被调用一次。该钩子的主要作用是进行一些资源释放等的操作。
void onStart() throws Exception当您使用START TRIGGER语句手动启动(被STOP TRIGGER语句停止的)触发器后,该钩子会被调用。
void onStop() throws Exception当您使用STOP TRIGGER语句手动停止触发器后,该钩子会被调用。

数据变动侦听钩子

目前触发器仅能侦听数据插入的操作。

数据变动侦听钩子的调用时机由CREATE TRIGGER语句显式指定,在编程接口层面不作区分。

单点数据插入侦听钩子
  1. Integer fire(long timestamp, Integer value) throws Exception;
  2. Long fire(long timestamp, Long value) throws Exception;
  3. Float fire(long timestamp, Float value) throws Exception;
  4. Double fire(long timestamp, Double value) throws Exception;
  5. Boolean fire(long timestamp, Boolean value) throws Exception;
  6. Binary fire(long timestamp, Binary value) throws Exception;

对于注册序列上的每一点数据插入,触发器都会调用fire作为响应,钩子的入参timestampvalue即是这一次插入数据点的时间和数据值。您可以在fire钩子中编写处理数据的任意逻辑。

注意,目前钩子的返回值是没有任何意义的。

批量数据插入侦听钩子
  1. int[] fire(long[] timestamps, int[] values) throws Exception;
  2. long[] fire(long[] timestamps, long[] values) throws Exception;
  3. float[] fire(long[] timestamps, float[] values) throws Exception;
  4. double[] fire(long[] timestamps, double[] values) throws Exception;
  5. boolean[] fire(long[] timestamps, boolean[] values) throws Exception;
  6. Binary[] fire(long[] timestamps, Binary[] values) throws Exception;

如果您需要在业务场景中使用到 Session API 的insertTablet接口或insertTablets接口,那么您可以通过实现上述数据插入的侦听钩子来降低触发器的调用开销。

推荐您在实现上述批量数据插入的侦听钩子时, 保证批量数据插入侦听钩子与单点数据插入侦听钩子的行为具有一致性。当您不实现批量数据插入的侦听钩子时,它将遵循下面的默认逻辑。

  1. default int[] fire(long[] timestamps, int[] values) throws Exception {
  2. int size = timestamps.length;
  3. for (int i = 0; i < size; ++i) {
  4. fire(timestamps[i], values[i]);
  5. }
  6. return values;
  7. }

注意,目前钩子的返回值是没有任何意义的。

重要注意事项

  • 每条序列上注册的触发器都是一个完整的触发器类的实例,因此您可以在触发器中维护一些状态数据。
  • 触发器维护的状态会在系统停止后被清空(除非您在钩子中主动将状态持久化)。换言之,系统启动后触发器的状态将会默认为初始值。
  • 一个触发器所有钩子的调用都是串行化的。

管理触发器

您可以通过 SQL 语句注册、卸载、启动或停止一个触发器实例,您也可以通过 SQL 语句查询到所有已经注册的触发器。

触发器的状态

触发器有两种运行状态:STARTEDSTOPPED,您需要执行START TRIGGER或者STOP TRIGGER来启动或者停止一个触发器。

当一个触发器的状态为STOPPED时,它将不会响应被注册序列上的操作(如插入数据点的操作),对外表现就会像是这个序列没有被注册过触发器一样,但是它会保存所有的状态(触发器类变量)信息,同时也会保存所有的注册信息。

注意,通过CREATE TRIGGER语句注册的触发器默认是STARTED的。

注册触发器

触发器只能注册在一个已经存在的时间序列上。任何时间序列只允许注册一个触发器。

被注册有触发器的序列将会被触发器侦听,当序列上有数据变动时,触发器中对应的钩子将会被调用。

注册一个触发器可以按如下流程进行:

  1. 实现一个完整的 Trigger 类,假定这个类的全类名为org.apache.iotdb.db.engine.trigger.example.AlertListener

  2. 将项目打成 JAR 包,如果您使用 Maven 管理项目,可以参考上述 Maven 项目示例的写法

  3. 将 JAR 包放置到目录 iotdb-server-0.13.0-SNAPSHOT/ext/trigger (也可以是iotdb-server-0.13.0-SNAPSHOT/ext/trigger的子目录)下。

    您可以通过修改配置文件中的trigger_root_dir来指定加载触发器 JAR 包的根路径。

  4. 使用 SQL 语句注册该触发器,假定赋予该触发器的名字为alert-listener-sg1d1s1

  5. 使用CREATE TRIGGER语句注册该触发器

    1. CREATE TRIGGER `alert-listener-sg1d1s1`
    2. AFTER INSERT
    3. ON root.sg1.d1.s1
    4. AS 'org.apache.iotdb.db.engine.trigger.example.AlertListener'
    5. WITH (
    6. 'lo' = '0',
    7. 'hi' = '100.0'
    8. )

注册触发器的详细 SQL 语法如下:

  1. CREATE TRIGGER <TRIGGER-NAME>
  2. (BEFORE | AFTER) INSERT
  3. ON <FULL-PATH>
  4. AS <CLASSNAME>

同时,您还可以通过WITH子句传入任意数量的自定义属性值:

  1. CREATE TRIGGER <TRIGGER-NAME>
  2. (BEFORE | AFTER) INSERT
  3. ON <FULL-PATH>
  4. AS <CLASSNAME>
  5. WITH (
  6. <KEY-1>=<VALUE-1>,
  7. <KEY-2>=<VALUE-2>,
  8. ...
  9. )

TRIGGER-NAME是用于标定触发器的全局唯一 ID,它是大小写敏感的。

目前触发器可以侦听序列上的所有的数据插入操作,触发器可以选择在数据插入前(BEFORE INSERT)或者数据插入后(AFTER INSERT)触发钩子调用。

FULL-PATH是触发器侦听的目标序列名称,这个序列必须是一个测点。

CLASSNAME是触发器类的全类名。

请注意,CLASSNAME以及属性值中的KEYVALUE都需要被单引号或者双引号引用起来。

卸载触发器

触发器会在下面几种情景下被卸载:

  1. 用户执行DELETE TIMESERIES时,序列上注册的触发器会被卸载
  2. 用户执行DELETE STORAGE GROUP时,对应存储组下注册的触发器会全部被卸载
  3. 用户使用DROP TRIGGER语句主动卸载

卸载触发器的 SQL 语法如下:

  1. DROP TRIGGER <TRIGGER-NAME>

TRIGGER-NAME是用于标定触发器的全局唯一 ID。

下面是一个DROP TRIGGER语句的例子:

  1. DROP TRIGGER `alert-listener-sg1d1s1`

启动触发器

该操作是“停止触发器”的逆操作。它将运行状态为STOPPED的触发器的运行状态变更为STARTED,这会使得触发器重新侦听被注册序列上的操作,并对数据变动产生响应。

启动触发器的 SQL 语法如下:

  1. START TRIGGER <TRIGGER-NAME>

TRIGGER-NAME是用于标定触发器的全局唯一 ID。

下面是一个START TRIGGER语句的例子:

  1. START TRIGGER `alert-listener-sg1d1s1`

注意,通过CREATE TRIGGER语句注册的触发器默认是STARTED的。

停止触发器

该操作将触发器的状态由STARTED变为STOPPED。当一个触发器的状态为STOPPED时,它将不会响应被注册序列上的操作(如插入数据点的操作),对外表现就会像是这个序列没有被注册过触发器一样。您可以使用START TRIGGER语句重新启动一个触发器。

停止触发器的 SQL 语法如下:

  1. STOP TRIGGER <TRIGGER-NAME>

TRIGGER-NAME是用于标定触发器的全局唯一 ID。

下面是一个STOP TRIGGER语句的例子:

  1. STOP TRIGGER `alert-listener-sg1d1s1`

查询所有注册的触发器

查询触发器的 SQL 语句如下:

  1. SHOW TRIGGERS

该语句展示已注册触发器的 ID、运行状态、触发时机、被注册的序列、触发器实例的全类名和注册触发器时用到的自定义属性。

用户权限管理

用户在使用触发器时会涉及到 4 种权限:

  • CREATE_TRIGGER:具备该权限的用户才被允许注册触发器操作。该权限需要与触发器的路径绑定。
  • DROP_TRIGGER:具备该权限的用户才被允许卸载触发器操作。该权限需要与触发器的路径绑定。
  • START_TRIGGER:具备该权限的用户才被允许启动已被停止的触发器。该权限需要与触发器的路径绑定。
  • STOP_TRIGGER:具备该权限的用户才被允许停止正在运行的触发器。该权限需要与触发器的路径绑定。

更多用户权限相关的内容,请参考 权限管理语句

实用工具类

实用工具类为常见的需求提供了编程范式和执行框架,它能够简化您编写触发器的一部分工作。

窗口工具类

窗口工具类能够辅助您定义滑动窗口以及窗口上的数据处理逻辑。它能够构造两类滑动窗口:一种滑动窗口是固定窗口内时间长度的(SlidingTimeWindowEvaluationHandler),另一种滑动窗口是固定窗口内数据点数的(SlidingSizeWindowEvaluationHandler)。

窗口工具类允许您在窗口(Window)上定义侦听钩子(Evaluator)。每当一个新的窗口形成,您定义的侦听钩子就会被调用一次。您可以在这个侦听钩子内定义任何数据处理相关的逻辑。侦听钩子的调用是异步的,因此,在执行钩子内窗口处理逻辑的时候,是不会阻塞当前线程的。

值得注意的是,不论是SlidingTimeWindowEvaluationHandler还是SlidingSizeWindowEvaluationHandler,他们都只能够处理时间戳严格单调递增的序列,传入的不符合要求的数据点会被工具类抛弃。

WindowEvaluator接口的定义见org.apache.iotdb.db.utils.windowing.api包。

固定窗口内数据点数的滑动窗口

窗口构造

共两种构造方法。

第一种方法需要您提供窗口接受数据点的类型、窗口大小、滑动步长和一个侦听钩子(Evaluator)。

  1. final TSDataType dataType = TSDataType.INT32;
  2. final int windowSize = 10;
  3. final int slidingStep = 5;
  4. SlidingSizeWindowEvaluationHandler handler =
  5. new SlidingSizeWindowEvaluationHandler(
  6. new SlidingSizeWindowConfiguration(dataType, windowSize, slidingStep),
  7. window -> {
  8. // do something
  9. });

第二种方法需要您提供窗口接受数据点的类型、窗口大小和一个侦听钩子(Evaluator)。这种构造方法下的窗口滑动步长等于窗口大小。

  1. final TSDataType dataType = TSDataType.INT32;
  2. final int windowSize = 10;
  3. SlidingSizeWindowEvaluationHandler handler =
  4. new SlidingSizeWindowEvaluationHandler(
  5. new SlidingSizeWindowConfiguration(dataType, windowSize),
  6. window -> {
  7. // do something
  8. });

窗口大小、滑动步长必须为正数。

数据接收
  1. final long timestamp = 0;
  2. final int value = 0;
  3. hander.collect(timestamp, value);

注意,collect方法接受的第二个参数类型需要与构造时传入的dataType声明一致。

此外,collect方法只会对时间戳是单调递增的数据点产生响应。如果某一次collect方法采集到的数据点的时间戳小于等于上一次collect方法采集到的数据点时间戳,那么这一次采集的数据点将会被抛弃。

还需要注意的是,collect方法不是线程安全的。

固定窗口内时间长度的滑动窗口

窗口构造

共两种构造方法。

第一种方法需要您提供窗口接受数据点的类型、窗口内时间长度、滑动步长和一个侦听钩子(Evaluator)。

  1. final TSDataType dataType = TSDataType.INT32;
  2. final long timeInterval = 1000;
  3. final long slidingStep = 500;
  4. SlidingTimeWindowEvaluationHandler handler =
  5. new SlidingTimeWindowEvaluationHandler(
  6. new SlidingTimeWindowConfiguration(dataType, timeInterval, slidingStep),
  7. window -> {
  8. // do something
  9. });

第二种方法需要您提供窗口接受数据点的类型、窗口内时间长度和一个侦听钩子(Evaluator)。这种构造方法下的窗口滑动步长等于窗口内时间长度。

  1. final TSDataType dataType = TSDataType.INT32;
  2. final long timeInterval = 1000;
  3. SlidingTimeWindowEvaluationHandler handler =
  4. new SlidingTimeWindowEvaluationHandler(
  5. new SlidingTimeWindowConfiguration(dataType, timeInterval),
  6. window -> {
  7. // do something
  8. });

窗口内时间长度、滑动步长必须为正数。

数据接收
  1. final long timestamp = 0;
  2. final int value = 0;
  3. hander.collect(timestamp, value);

注意,collect方法接受的第二个参数类型需要与构造时传入的dataType声明一致。

此外,collect方法只会对时间戳是单调递增的数据点产生响应。如果某一次collect方法采集到的数据点的时间戳小于等于上一次collect方法采集到的数据点时间戳,那么这一次采集的数据点将会被抛弃。

还需要注意的是,collect方法不是线程安全的。

拒绝策略

窗口计算的任务执行是异步的。

当异步任务无法被执行线程池及时消费时,会产生任务堆积。在极端情况下,异步任务的堆积会导致系统 OOM。因此,窗口计算线程池允许堆积的任务数量被设定为有限值。

当堆积的任务数量超出限值时,新提交的任务将无法进入线程池执行,此时,系统会调用您在侦听钩子(Evaluator)中制定的拒绝策略钩子onRejection进行处理。

onRejection的默认行为如下。

  1. default void onRejection(Window window) {
  2. throw new RejectedExecutionException();
  3. }

制定拒绝策略钩子的方式如下。

  1. SlidingTimeWindowEvaluationHandler handler =
  2. new SlidingTimeWindowEvaluationHandler(
  3. new SlidingTimeWindowConfiguration(TSDataType.INT32, 1, 1),
  4. new Evaluator() {
  5. @Override
  6. public void evaluate(Window window) {
  7. // do something
  8. }
  9. @Override
  10. public void onRejection(Window window) {
  11. // do something
  12. }
  13. });

配置参数

concurrent_window_evaluation_thread

窗口计算线程池的默认线程数。默认为 CPU 核数。

max_pending_window_evaluation_tasks

最多允许堆积的窗口计算任务。默认为 64 个。

Sink 工具类

Sink 工具类为触发器提供了连接外部系统的能力。

它提供了一套编程范式。每一个 Sink 工具都包含一个用于处理数据发送的Handler、一个用于配置HandlerConfiguration,还有一个用于描述发送数据的Event

LocalIoTDBSink

LocalIoTDBSink用于向本地序列写入数据点。

在写入数据前,不要求时间序列已被创建。

注意,在触发器场景中,侦听的时间序列和写入的目标时间序列不要在同一个存储组下。

使用示例:

  1. final String device = "root.alerting";
  2. final String[] measurements = new String[] {"local"};
  3. final TSDataType[] dataTypes = new TSDataType[] {TSDataType.DOUBLE};
  4. LocalIoTDBHandler localIoTDBHandler = new LocalIoTDBHandler();
  5. localIoTDBHandler.open(new LocalIoTDBConfiguration(device, measurements, dataTypes));
  6. // insert 100 data points
  7. for (int i = 0; i < 100; ++i) {
  8. final long timestamp = i;
  9. final double value = i;
  10. localIoTDBHandler.onEvent(new LocalIoTDBEvent(timestamp, value));
  11. }

注意,当您需要向某个TEXT类型的序列写入数据时,您需要借助org.apache.iotdb.tsfile.utils.Binary

  1. // insert 100 data points
  2. for (int i = 0; i < 100; ++i) {
  3. final long timestamp = i;
  4. final String value = "" + i;
  5. localIoTDBHandler.onEvent(new LocalIoTDBEvent(timestamp, Binary.valueOf(value)));
  6. }

MQTTSink

触发器可以使用MQTTSink向其他的 IoTDB 实例发送数据点。

在发送数据前,不要求时间序列已被创建。

使用示例:

  1. final String host = "127.0.0.1";
  2. final int port = 1883;
  3. final String username = "root";
  4. final String password = "root";
  5. final PartialPath device = new PartialPath("root.alerting");
  6. final String[] measurements = new String[] {"remote"};
  7. MQTTHandler mqttHandler = new MQTTHandler();
  8. mqttHandler.open(new MQTTConfiguration(host, port, username, password, device, measurements));
  9. final String topic = "test";
  10. final QoS qos = QoS.EXACTLY_ONCE;
  11. final boolean retain = false;
  12. // send 100 data points
  13. for (int i = 0; i < 100; ++i) {
  14. final long timestamp = i;
  15. final double value = i;
  16. mqttHandler.onEvent(new MQTTEvent(topic, qos, retain, timestamp, value));
  17. }

AlertManagerSink

触发器可以使用AlertManagerSink 向 AlertManager 发送消息。

AlertManagerConfiguration 的构造需传入 AlertManager 的发送告警的 endpoint。

  1. AlertManagerConfiguration(String endpoint);

AlertManagerEvent 提供三种构造函数:

  1. AlertManagerEvent(String alertname);
  2. AlertManagerEvent(String alertname, Map<String, String> extraLabels);
  3. AlertManagerEvent(String alertname, Map<String, String> extraLabels, Map<String, String> annotations);

其中:

  • alertname 是必传参数,用于标识一个 alertalertname 字段可用于 AlertManager 发送告警时的分组和消重。
  • extraLabels 可选传,在后台与 alertname 组合成 labels 一起标识一个 alert,可用于 AlertManager 发送告警时的分组和消重。
  • annotations 可选传,它的 value 值可使用 Go 语言模板风格的

    1. {{.<label_key>}}

    它在最终生成消息时会被替换为 labels[<label_key>]

  • labelsannotations 会被解析成 json 字符串发送给 AlertManager
  1. {
  2. "labels": {
  3. "alertname": "<requiredAlertName>",
  4. "<labelname>": "<labelvalue>",
  5. ...
  6. },
  7. "annotations": {
  8. "<labelname>": "<labelvalue>",
  9. ...
  10. }
  11. }

调用 AlertManagerHandleronEvent(AlertManagerEvent event) 方法发送一个告警。

使用示例 1:

只传 alertname

  1. AlertManagerHandler alertManagerHandler = new AlertManagerHandler();
  2. alertManagerHandler.open(new AlertManagerConfiguration("http://127.0.0.1:9093/api/v1/alerts"));
  3. final String alertName = "test0";
  4. AlertManagerEvent alertManagerEvent = new AlertManagerEvent(alertName);
  5. alertManagerHandler.onEvent(alertManagerEvent);

使用示例 2:

传入 alertnameextraLabels

  1. AlertManagerHandler alertManagerHandler = new AlertManagerHandler();
  2. alertManagerHandler.open(new AlertManagerConfiguration("http://127.0.0.1:9093/api/v1/alerts"));
  3. final String alertName = "test1";
  4. final HashMap<String, String> extraLabels = new HashMap<>();
  5. extraLabels.put("severity", "critical");
  6. extraLabels.put("series", "root.ln.wt01.wf01.temperature");
  7. extraLabels.put("value", String.valueOf(100.0));
  8. AlertManagerEvent alertManagerEvent = new AlertManagerEvent(alertName, extraLabels);
  9. alertManagerHandler.onEvent(alertManagerEvent);

使用示例 3:

传入 alertnameextraLabelsannotations

最终 description 字段的值会被解析为 test2: root.ln.wt01.wf01.temperature is 100.0

  1. AlertManagerHandler alertManagerHandler = new AlertManagerHandler();
  2. alertManagerHandler.open(new AlertManagerConfiguration("http://127.0.0.1:9093/api/v1/alerts"));
  3. final String alertName = "test2";
  4. final HashMap<String, String> extraLabels = new HashMap<>();
  5. extraLabels.put("severity", "critical");
  6. extraLabels.put("series", "root.ln.wt01.wf01.temperature");
  7. extraLabels.put("value", String.valueOf(100.0));
  8. final HashMap<String, String> annotations = new HashMap<>();
  9. annotations.put("summary", "high temperature");
  10. annotations.put("description", "{{.alertname}}: {{.series}} is {{.value}}");
  11. alertManagerHandler.onEvent(new AlertManagerEvent(alertName, extraLabels, annotations));

完整的 Maven 示例项目

如果您使用 Maven触发器 - 图3 (opens new window),可以参考我们编写的示例项目 trigger-example

您可以在 这里触发器 - 图4 (opens new window) 找到它。

它展示了:

  • 如何使用 Maven 管理您的 trigger 项目
  • 如何基于触发器的用户编程接口实现数据侦听
  • 如何使用窗口工具类
  • 如何使用 Sink 工具类
  1. package org.apache.iotdb.trigger;
  2. import org.apache.iotdb.db.engine.trigger.api.Trigger;
  3. import org.apache.iotdb.db.engine.trigger.api.TriggerAttributes;
  4. import org.apache.iotdb.db.metadata.path.PartialPath;
  5. import org.apache.iotdb.db.engine.trigger.sink.mqtt.MQTTConfiguration;
  6. import org.apache.iotdb.db.engine.trigger.sink.mqtt.MQTTEvent;
  7. import org.apache.iotdb.db.engine.trigger.sink.mqtt.MQTTHandler;
  8. import org.apache.iotdb.db.engine.trigger.sink.local.LocalIoTDBConfiguration;
  9. import org.apache.iotdb.db.engine.trigger.sink.local.LocalIoTDBEvent;
  10. import org.apache.iotdb.db.engine.trigger.sink.local.LocalIoTDBHandler;
  11. import org.apache.iotdb.db.utils.windowing.configuration.SlidingSizeWindowConfiguration;
  12. import org.apache.iotdb.db.utils.windowing.handler.SlidingSizeWindowEvaluationHandler;
  13. import org.apache.iotdb.tsfile.file.metadata.enums.TSDataType;
  14. import org.fusesource.mqtt.client.QoS;
  15. import org.slf4j.Logger;
  16. import org.slf4j.LoggerFactory;
  17. public class TriggerExample implements Trigger {
  18. private static final Logger LOGGER = LoggerFactory.getLogger(TriggerExample.class);
  19. private static final String TARGET_DEVICE = "root.alerting";
  20. private final LocalIoTDBHandler localIoTDBHandler = new LocalIoTDBHandler();
  21. private final MQTTHandler mqttHandler = new MQTTHandler();
  22. private SlidingSizeWindowEvaluationHandler windowEvaluationHandler;
  23. @Override
  24. public void onCreate(TriggerAttributes attributes) throws Exception {
  25. LOGGER.info("onCreate(TriggerAttributes attributes)");
  26. double lo = attributes.getDouble("lo");
  27. double hi = attributes.getDouble("hi");
  28. openSinkHandlers();
  29. windowEvaluationHandler =
  30. new SlidingSizeWindowEvaluationHandler(
  31. new SlidingSizeWindowConfiguration(TSDataType.DOUBLE, 5, 5),
  32. window -> {
  33. double avg = 0;
  34. for (int i = 0; i < window.size(); ++i) {
  35. avg += window.getDouble(i);
  36. }
  37. avg /= window.size();
  38. if (avg < lo || hi < avg) {
  39. localIoTDBHandler.onEvent(new LocalIoTDBEvent(window.getTime(0), avg));
  40. mqttHandler.onEvent(
  41. new MQTTEvent("test", QoS.EXACTLY_ONCE, false, window.getTime(0), avg));
  42. }
  43. });
  44. }
  45. @Override
  46. public void onDrop() throws Exception {
  47. LOGGER.info("onDrop()");
  48. closeSinkHandlers();
  49. }
  50. @Override
  51. public void onStart() throws Exception {
  52. LOGGER.info("onStart()");
  53. openSinkHandlers();
  54. }
  55. @Override
  56. public void onStop() throws Exception {
  57. LOGGER.info("onStop()");
  58. closeSinkHandlers();
  59. }
  60. @Override
  61. public Double fire(long timestamp, Double value) {
  62. windowEvaluationHandler.collect(timestamp, value);
  63. return value;
  64. }
  65. @Override
  66. public double[] fire(long[] timestamps, double[] values) {
  67. for (int i = 0; i < timestamps.length; ++i) {
  68. windowEvaluationHandler.collect(timestamps[i], values[i]);
  69. }
  70. return values;
  71. }
  72. private void openSinkHandlers() throws Exception {
  73. localIoTDBHandler.open(
  74. new LocalIoTDBConfiguration(
  75. TARGET_DEVICE, new String[]{"local"}, new TSDataType[]{TSDataType.DOUBLE}));
  76. mqttHandler.open(
  77. new MQTTConfiguration(
  78. "127.0.0.1",
  79. 1883,
  80. "root",
  81. new PartialPath(TARGET_DEVICE),
  82. new String[]{"remote"}));
  83. }
  84. private void closeSinkHandlers() throws Exception {
  85. localIoTDBHandler.close();
  86. mqttHandler.close();
  87. }
  88. }

您可以按照下面的步骤试用这个触发器:

  • iotdb-engine.properties中启用 MQTT 服务

    1. # whether to enable the mqtt service.
    2. enable_mqtt_service=true
  • 启动 IoTDB 服务器

  • 通过 cli 创建时间序列

    1. CREATE TIMESERIES root.sg1.d1.s1 WITH DATATYPE=DOUBLE, ENCODING=PLAIN;
  • trigger-example 中打包好的 JAR(trigger-example-0.13.0-SNAPSHOT.jar)放置到目录 iotdb-server-0.13.0-SNAPSHOT/ext/trigger (也可以是iotdb-server-0.13.0-SNAPSHOT/ext/trigger的子目录)下

    您可以通过修改配置文件中的trigger_root_dir来指定加载触发器 JAR 包的根路径。

  • 使用 SQL 语句注册该触发器,假定赋予该触发器的名字为window-avg-alerter

  • 使用CREATE TRIGGER语句注册该触发器

    1. CREATE TRIGGER `window-avg-alerter`
    2. AFTER INSERT
    3. ON root.sg1.d1.s1
    4. AS 'org.apache.iotdb.trigger.TriggerExample'
    5. WITH (
    6. 'lo' = '0',
    7. 'hi' = '10.0'
    8. )
  • 使用 cli 插入测试数据

    1. INSERT INTO root.sg1.d1(timestamp, s1) VALUES (1, 0);
    2. INSERT INTO root.sg1.d1(timestamp, s1) VALUES (2, 2);
    3. INSERT INTO root.sg1.d1(timestamp, s1) VALUES (3, 4);
    4. INSERT INTO root.sg1.d1(timestamp, s1) VALUES (4, 6);
    5. INSERT INTO root.sg1.d1(timestamp, s1) VALUES (5, 8);
    6. INSERT INTO root.sg1.d1(timestamp, s1) VALUES (6, 10);
    7. INSERT INTO root.sg1.d1(timestamp, s1) VALUES (7, 12);
    8. INSERT INTO root.sg1.d1(timestamp, s1) VALUES (8, 14);
    9. INSERT INTO root.sg1.d1(timestamp, s1) VALUES (9, 16);
    10. INSERT INTO root.sg1.d1(timestamp, s1) VALUES (10, 18);
  • 使用 cli 查询数据以验证触发器的行为

    1. SELECT * FROM root.alerting;
  • 正常情况下,得到如下结果

    1. IoTDB> SELECT * FROM root.alerting;
    2. +-----------------------------+--------------------+-------------------+
    3. | Time|root.alerting.remote|root.alerting.local|
    4. +-----------------------------+--------------------+-------------------+
    5. |1970-01-01T08:00:00.006+08:00| 14.0| 14.0|
    6. +-----------------------------+--------------------+-------------------+
    7. Total line number = 1
    8. It costs 0.006s

以上就是基本的使用方法,希望您能喜欢 😄

重要注意事项

  • 触发器是通过反射技术动态装载的,因此您在装载过程中无需启停服务器。

  • 不同的 JAR 包中最好不要有全类名相同但功能实现不一样的类。例如:触发器trigger1trigger2分别对应资源trigger1.jartrigger2.jar。如果两个 JAR 包里都包含一个org.apache.iotdb.db.engine.trigger.example.AlertListener类,当CREATE TRIGGER使用到这个类时,系统会随机加载其中一个 JAR 包中的类,最终导致触发器执行行为不一致以及其他的问题。

  • 拥有同一个全类名的触发器类的版本管理问题。IoTDB 不允许系统中存在拥有同一全类名但是版本(逻辑)不一样的触发器。

    相关问题:IoTDB 预先注册了 10 个org.apache.iotdb.db.engine.trigger.example.AlertListener触发器实例,DBA 更新了org.apache.iotdb.db.engine.trigger.example.AlertListener的实现和对应的 JAR 包,是否可以只卸载其中 5 个,将这 5 个替换为新的实现?

    回答:无法做到。只有将预先注册的 10 个触发器全部卸载,才能装载到新的触发器实例。在原有触发器没有全部被卸载的情况下,新注册的拥有相同全类名的触发器行为只会与现有触发器的行为一致。