Ingest Data into PostgreSQL

EMQX supports integration with PostgreSQL so you can save MQTT messages and client events to PostgreSQL, or use events to trigger the update or removal of data to record the online status or online/offline of clients.

TIP

EMQX Enterprise Edition features. EMQX Enterprise Edition provides comprehensive coverage of key business scenarios, rich data integration, product-level reliability, and 24/7 global technical support. Experience the benefits of this enterprise-ready MQTT messaging platformIngest Data into PostgreSQL - 图1 (opens new window) today.

TIP

This section is also applicable to TimescaleDB and MatrixDB.

Prerequisites

Features List

Quick Start Tutorial

This section introduces how to configure the PostgreSQL data bridge, covering topics like how to set up the PostgreSQL server, create data bridges and rules for forwarding data to PostgreSQL and test the data bridges and rules.

This tutorial assumes that you run both EMQX and PostgreSQL on the local machine. If you have PostgreSQL and EMQX running remotely, adjust the settings accordingly.

Install PostgreSQL

Install PostgreSQL via Docker, and then run the docker image.

  1. # To start the PostgreSQL docker image and set the password as public
  2. docker run --name PostgreSQL -p 5432:5432 -e POSTGRES_PASSWORD=public -d postgres
  3. # Access the container
  4. docker exec -it PostgreSQL bash
  5. # Locate the PostgreSQL server in the container and input the preset password
  6. psql -U postgres -W
  7. # Create and then select the database
  8. CREATE DATABASE emqx_data;
  9. \c emqx_data;

Create Data Tables

Use the following SQL statements to create data table emqx_messages in PostgreSQL database for storing the client ID, topic, payload, and creating time of every message.

  1. CREATE TABLE t_mqtt_msg (
  2. id SERIAL primary key,
  3. msgid character varying(64),
  4. sender character varying(64),
  5. topic character varying(255),
  6. qos integer,
  7. retain integer,
  8. payload text,
  9. arrived timestamp without time zone
  10. );

Use the following SQL statements to create data table emqx_client_events in PostgreSQL database for storing the client ID, event type, and creating time of every event.

  1. CREATE TABLE emqx_client_events (
  2. id SERIAL primary key,
  3. clientid VARCHAR(255),
  4. event VARCHAR(255),
  5. created_at TIMESTAMP DEFAULT CURRENT_TIMESTAMP
  6. );

Create PostgreSQL Data Bridges

You need to create 2 data bridges to PostgreSQL for messages storage and event records respectively.

Message storage

  1. Go to EMQX Dashboard, and click Integration -> Data Bridge.

  2. Click Create on the top right corner of the page.

  3. In the Create Data Bridge page, click to select PostgreSQL, and then click Next.

  4. Input a name for the data bridge. The name should be a combination of upper/lower case letters and numbers.

  5. Input the connection information:

    • Server Host: Input http://127.0.0.1:5432, or the actual URL if the PostgreSQL server is running remotely.
    • Database Name: Input emqx_data.
    • Username: Input root.
    • Password: Input public.
  6. Configure the SQL Template. Use the SQL statements below to insert data.

    Note: This is a preprocessed SQL, so the fields should not be enclosed in quotation marks, and do not write a semicolon at the end of the statements.

  1. INSERT INTO t_mqtt_msg(msgid, sender, topic, qos, payload, arrived) VALUES(
  2. ${id},
  3. ${clientid},
  4. ${topic},
  5. ${qos},
  6. ${payload},
  7. TO_TIMESTAMP((${timestamp} :: bigint)/1000)
  8. )
  1. Advanced settings (optional): Choose whether to use sync or async query mode as needed. For details, see Configuration.
  2. Before clicking Create, you can click Test Connectivity to test that the bridge can connect to the MySQL server.
  3. Then click Create to finish the creation of the data bridge.

Online/Offline Status Recording

The operating steps are similar to those at the Message Storage part expect for the SQL template and SQL rules.

The SQL template for online/offline status recording is as follows.

Note: This is a preprocessed SQL, so the fields should not be enclosed in quotation marks, and do not write a semicolon at the end of the statements.

  1. INSERT INTO emqx_client_events(clientid, event, created_at) VALUES (
  2. ${clientid},
  3. ${event},
  4. TO_TIMESTAMP((${timestamp} :: bigint)/1000)
  5. )

Now the PostgreSQL data bridge should appear in the data bridge list (Integration -> Data Bridge) with Resource Status as Connected.

Create Rules for PostgreSQL Data Bridge

After you have successfully created the data bridge to PostgreSQL, you can continue to create rules to specify the data to be saved into PostgreSQL and rules for the online/offline status recording.

Message Storage

  1. Go to EMQX Dashboard, click Integration -> Rules.

  2. Click Create on the top right corner of the page.

  3. Input my_rule as the rule ID, and set the rules in the SQL Editor. Here we want to save the MQTT messages under topic t/# to PostgreSQL, we can use the SQL syntax below.

    Note: If you want to specify your own SQL syntax, make sure that you have included all fields required by the data bridge in the SELECT part.

  1. SELECT
  2. *
  3. FROM
  4. "t/#"
  1. Then click the Add Action button, select Forwarding with Data Bridge from the dropdown list and then select the data bridge we just created under Data bridge.
  2. Then, click the Add button.
  3. Then click the Create button to finish the setup.

Online/Offline Status Recording

The creating steps are similar to those at the Message Storage part except for the SQL rules.

The SQL rule is as follows:

  1. SELECT
  2. *
  3. FROM
  4. "$events/client_connected", "$events/client_disconnected"

Now you have successfully created the data bridge to PostgreSQL. You can click Integration -> Flows to view the topology. It can be seen that the messages under topic t/# are sent and saved to PostgreSQL after parsing by rule my_rule.

Test the Data Bridges and Rules

Use MQTTX to send a message to topic t/1 to trigger an online/offline event.

  1. mqttx pub -i emqx_c -t t/1 -m '{ "msg": "hello PostgreSQL" }'

Check the running status of the two data bridges, there should be one new incoming and one new outgoing message.

Check whether the data is written into the t_mqtt_messages data table.

  1. emqx_data=# select * from t_mqtt_msg;
  2. id | msgid | sender | topic | qos | retain | payload
  3. | arrived
  4. ----+----------------------------------+--------+-------+-----+--------+-------------------------------+---------------------
  5. 1 | 0005F298A0F0AEE2F443000012DC0002 | emqx_c | t/1 | 0 | | { "msg": "hello PostgreSQL" } | 2023-01-19 07:10:32
  6. (1 row)

Check whether the data is written into theemqx_client_events table.

  1. emqx_data=# select * from emqx_client_events;
  2. id | clientid | event | created_at
  3. ----+----------+---------------------+---------------------
  4. 3 | emqx_c | client.connected | 2023-01-19 07:10:32
  5. 4 | emqx_c | client.disconnected | 2023-01-19 07:10:32
  6. (2 rows)