Save offline messages to Cassandra

Set up the Cassandra database and set the user name and password to root/public. Take MacOS X as an example:

  1. $ brew install cassandra
  2. ## Modify the configuration and disable anonymous authentication
  3. $ vim /usr/local/etc/cassandra/cassandra.yaml
  4. authenticator: PasswordAuthenticator
  5. authorizer: CassandraAuthorizer
  6. $ brew services start cassandra
  7. ## Create root user
  8. $ cqlsh -ucassandra -pcassandra
  9. create user root with password 'public' superuser;

Initialize the Cassandra tablespace:

  1. $ cqlsh -uroot -ppublic
  2. CREATE KEYSPACE mqtt WITH replication = {'class': 'SimpleStrategy', 'replication_factor': '1'} AND durable_writes = true;

Create the mqtt_msg table:

  1. CREATE TABLE mqtt.mqtt_msg (
  2. topic text,
  3. msgid text,
  4. arrived timestamp,
  5. payload text,
  6. qos int,
  7. retain int,
  8. sender text,
  9. PRIMARY KEY (topic, msgid)
  10. ) WITH CLUSTERING ORDER BY (msgid DESC)
  11. AND bloom_filter_fp_chance = 0.01
  12. AND caching = {'keys': 'ALL', 'rows_per_partition': 'NONE'}
  13. AND comment = ''
  14. AND compaction = {'class': 'org.apache.cassandra.db.compaction.SizeTieredCompactionStrategy', 'max_threshold': '32', 'min_threshold': '4'}
  15. AND compression = {'chunk_length_in_kb': '64', 'class': 'org.apache.cassandra.io.compress.LZ4Compressor'}
  16. AND crc_check_chance = 1.0
  17. AND dclocal_read_repair_chance = 0.1
  18. AND default_time_to_live = 0
  19. AND gc_grace_seconds = 864000
  20. AND max_index_interval = 2048
  21. AND memtable_flush_period_in_ms = 0
  22. AND min_index_interval = 128
  23. AND read_repair_chance = 0.0
  24. AND speculative_retry = '99PERCENTILE';

TIP

The message table structure cannot be modified. Please use the above SQL statement to create

Create rules:

Open EMQX DashboardSave offline Message to Cassandra - 图1 (opens new window) and select the “Rules” tab on the left.

Then fill in the rule SQL:

FROM description

t/#: The publisher publishes a message to trigger the action of saving of offline messages to Cassandra

$events/session_subscribed: The subscriber subscribes to topics to trigger the action of getting offline messages

$events/message_acked: The subscriber replies to the message ACK to trigger the action of deleting the offline message that has been received

  1. SELECT * FROM "t/#", "$events/session_subscribed", "$events/message_acked" WHERE topic =~ 't/#'

Save offline Message to Cassandra - 图2

Related actions:

Select “Add Action” on the “Response Action” interface, and then select “Save offline messages to Cassandra” in the “Add Action” drop-down box

Save offline Message to Cassandra - 图3

Now that the resource drop-down box is empty, and you can click “New” in the upper right corner to create a Cassandra resource:

Save offline Message to Cassandra - 图4

The “Create Resource” dialog box pops up

Save offline Message to Cassandra - 图5

Fill in the resource configuration:

Fill in the real Cassandra server address and the values corresponding to other configurations, and then click the “Test Connection” button to ensure that the connection test is successful.

Finally click the “OK” button.

Save offline Message to Cassandra - 图6

Return to the response action interface and click “OK”.

Save offline Message to Cassandra - 图7

Return to the rule creation interface and click “Create”.

Save offline Message to Cassandra - 图8

The rule has been created, and you can send a piece of data through the WebSocket client of Dashboard (The QoS of the published message must be greater than 0):

Save offline Message to Cassandra - 图9

After the message is sent, you can see the message is saved in Cassandra through cqlsh:

Save offline Message to Cassandra - 图10

Use another client to subscribe to the topic “t/1” (the QoS of the subscribed topic must be greater than 0, otherwise the message will be received repeatedly):

Save offline Message to Cassandra - 图11

After subscribing, you will receive the offline message saved in Cassandra immediately:

Save offline Message to Cassandra - 图12

Offline messages will be deleted in Cassandra after being received:

Save offline Message to Cassandra - 图13