Graph Rule

Originally, eKuiper leverage SQL to define the rule logic. Although it is handy for developers, it is still not easy to use for users with no development knowledge. During runtime, rules are a DAG of elements(source/operator/sink) even when defining by SQL. The graph can be easily mapping to a drag and drop UI to facilitate the users. Thus, an alternative graph property is provided in the rule API.

The graph property is a JSON presentation of the DAG. It is consisted by nodes and topo which defines the nodes in the graph and their edges respectively. Below is a simplest rule defined by graph. It defines 3 nodes demo, humidityFilter and mqttOut. And the graph is linear as demo -> humidityFilter -> mqttOut. The rule will read from mqtt(demo), filter by humidity(humidityFilter) and sink to mqtt(mqttOut).

  1. {
  2. "id": "rule1",
  3. "name": "Test Condition",
  4. "graph": {
  5. "nodes": {
  6. "demo": {
  7. "type": "source",
  8. "nodeType": "mqtt",
  9. "props": {
  10. "datasource": "devices/+/messages"
  11. }
  12. },
  13. "humidityFilter": {
  14. "type": "operator",
  15. "nodeType": "filter",
  16. "props": {
  17. "expr": "humidity > 30"
  18. }
  19. },
  20. "mqttout": {
  21. "type": "sink",
  22. "nodeType": "mqtt",
  23. "props": {
  24. "server": "tcp://${mqtt_srv}:1883",
  25. "topic": "devices/result"
  26. }
  27. }
  28. },
  29. "topo": {
  30. "sources": ["demo"],
  31. "edges": {
  32. "demo": ["humidityFilter"],
  33. "humidityFilter": ["mqttout"]
  34. }
  35. }
  36. }
  37. }

Nodes

Each node in the graph JSON has at least 3 fields:

  • type: the type of the node, could be source, operator and sink.
  • nodeType: the node type which defines the business logic of a node. There are various node types including built-in types and extended types defined by the plugins.
  • props: the properties for the node. It is different for each nodeType.

Node Type

For source node, the nodeType is the type of the source like mqtt and edgex. Please refer to source for all supported types. Notice that, all source node shared the same properties which is the same as the properties when defining a stream. The specific configuration are referred by CONF_KEY. In the below example, the nodeType specifies the source node is a mqtt source. The datasource and format property has the same meaning as defining a stream.

  1. "demo": {
  2. "type": "source",
  3. "nodeType": "mqtt",
  4. "props": {
  5. "datasource": "devices/+/messages",
  6. "format":"json"
  7. }
  8. },

For sink node, the nodeType is the type of the sink like mqtt and edgex. Please refer to sink for all supported types. For all sink nodes, they share some common properties but each type will have some owned properties.

For operator node, the nodeType are newly defined. Each nodeType will have different properties.

Built-in Operator Node Types

Currently, we supported the below node types for operator type.

function

This node defines a function call expression. The node return a new field with the name of the function or the alias name define in the expr property. It has only one property:

  • expr: string, the function call expression.

Example:

  1. "logfunc": {
  2. "type": "operator",
  3. "nodeType": "function",
  4. "props": {
  5. "expr": "log(temperature) as log_temperature"
  6. }
  7. }

aggfunc

This node defines an aggregate function call expression. The input for the node must be a collection of rows as the output of a window. The node will aggregate multiple rows into one aggregated row. For example, calculate the count of the window of 10 rows will produce only one row with field count = 10. Calculate the count of the grouped rows will produce one row for each group. It has only one property:

  • expr: string, the aggregate function call expression.

Example:

  1. "countop": {
  2. "type": "operator",
  3. "nodeType": "aggfunc",
  4. "props": {
  5. "expr": "count(*)"
  6. }
  7. }

filter

This node filter the data stream with a condition expression. It has only one propety:

  • expr: string, the condition bool expression

Example:

  1. "myfilter": {
  2. "type": "operator",
  3. "nodeType": "filter",
  4. "props": {
  5. "expr": "temperature > 20"
  6. }
  7. }

pick

This node selects the fields to be presented in the following workflow. It is usually used in the end of a workflow to define the data to be selected. It has only one property:

  • fields: []string, the fields to be selected

Example:

  1. "pick": {
  2. "type": "operator",
  3. "nodeType": "pick",
  4. "props": {
  5. "fields": ["log_temperature", "humidity", "window_end()"]
  6. }
  7. }

window

This node defines a window in the workflow. It can accept multiple inputs but each input must be a single row. It will produce a collection of rows.

  • type: string, the window type, available values are “tumblingwindow”, “hoppingwindow”, “slidingwindow”, “sessionwindow” and “countwindow”.
  • unit: the time unit to be used. Check time units for all available values.
  • size: int, the window length.
  • interval: int, the window trigger interval.

Example:

  1. "window": {
  2. "type": "operator",
  3. "nodeType": "window",
  4. "props": {
  5. "type": "hoppingwindow",
  6. "unit": "ss",
  7. "size": 10,
  8. "interval": 5
  9. }
  10. },

join

This node can merge data from different sources like a SQL join operation. The input must be a collection of row produced by a window. The output is another row collection whose rows are joined tuples. The properties are:

  • from: string, the left source node to join.
  • joins: an array of join conditions. Each join has the properties:
    • name: string, the right source node to join
    • type: string, the join type, could be inner, left, right, full, cross etc.
    • on: string, the bool expression to define the join condition

Example:

  1. "joinop": {
  2. "type": "operator",
  3. "nodeType": "join",
  4. "props": {
  5. "from": "device1",
  6. "joins": [
  7. {
  8. "name": "device2",
  9. "type": "inner",
  10. "on": "abs(device1.ts - device2.ts) < 200"
  11. }
  12. ]
  13. }
  14. }

groupby

This node defines the dimension to group by. The input must be a collection of rows. The output is a collection of grouped tuples. The properties are:

  • dimensions: []string, the expressions of dimensions

Example:

  1. "groupop": {
  2. "type": "operator",
  3. "nodeType": "groupby",
  4. "props": {
  5. "dimensions": ["device1.humidity"]
  6. }
  7. },

orderby

This node will sort the input collection. So the input must be a collection of rows and the output will be the same type. The properties are:

  • sorts: an array of sort conditions. Each condition has the properties:
    • field: string, the field to be sorted with.
    • order: string, the sorted direction, could be asc or desc.

Example:

  1. "orderop": {
  2. "type": "operator",
  3. "nodeType": "orderby",
  4. "props": {
  5. "sorts": [{
  6. "field": "count",
  7. "order": "desc"
  8. }]
  9. }
  10. }