Backends

MQTT Message Persistence

One-to-one message Persistence

_images/backends_1.png
  1. PUB publishes a message;
  2. Backend records this message in DB;
  3. SUB subscribes to a topic;
  4. Backend retrieves the messages of this topic from DB;
  5. Messages are sent to SUB;
  6. Once the SUB acknowledged / received the message, backend removes the message from DB.

Many-to-many message Persistence

_images/backends_2.png
  1. PUB publishes a message;
  2. Backend records the message in DB;
  3. SUB1 and SUB2 subscribe to a topic;
  4. Backend retrieves the messages of this topic;
  5. Messages are sent to SUB1 and SUB2;
  6. Backend records the read position of SUB1 and SUB2, the next message’s retrieval starts from this position.

Client Connection State

EMQ X supports retaining the client's connection state in Redis or DB.

Client Subscription by Broker

EMQ X Persistence supports subscription by broker. When a client goes online, the persistence module loads the subscriptions of the client from Redis or Databases.

List of Persistence Plugins

EMQ X allowes storing messages in Redis, MySQL, PostgreSQL, MongoDB, Cassandra, DynamoDB, InfluxDB, OpenTSDB and Timescale:

Persistence Plugins Config File Description
emqx_backend_redis emqx_backend_redis.conf Redis Message Persistence
emqx_backend_mysql emqx_backend_mysql.conf MySQL Message Persistence
emqx_backend_pgsql emqx_backend_pgsql.conf PostgreSQL Message Persistence
emqx_backend_mongo emqx_backend_mongo.conf MongoDB Message Persistence
emqx_backend_cassa emqx_backend_cassa.conf Cassandra Message Persistence
emqx_backend_dynamo emqx_backend_dynamo.conf DynamoDB Message Persistence
emqx_backend_influxdb emqx_backend_influxdb.conf InfluxDB Message Persistence
emqx_backend_opentsdb emqx_backend_opentsdb.conf OpenTSDB Message Persistence
emqx_backend_timescale emqx_backend_timescale.conf Timescale Message Persistence

Redis Backend

Config file: emqx_backend_redis.conf

Configure the Redis Server

Config Connection Pool of Multiple Redis Servers:

## Redis Server
backend.redis.pool1.server = 127.0.0.1:6379

## Redis Sentinel
## backend.redis.server = 127.0.0.1:26378

##Redis Sentinel Cluster name
## backend.redis.sentinel = mymaster

## Redis Pool Size
backend.redis.pool1.pool_size = 8

## Redis database
backend.redis.pool1.database = 1

## Redis subscribe channel
backend.redis.pool1.channel = mqtt_channel

Configure Persistence Hooks

## Expired after seconds, if =< 0 take the default value
backend.redis.msg.expired_after = 3600

## Client Connected Record
backend.redis.hook.client.connected.1    = {"action": {"function": "on_client_connected"}, "pool": "pool1"}

## Subscribe Lookup Record
backend.redis.hook.client.connected.2    = {"action": {"function": "on_subscribe_lookup"}, "pool": "pool1"}

## Client DisConnected Record
backend.redis.hook.client.disconnected.1 = {"action": {"function": "on_client_disconnected"}, "pool": "pool1"}

## Lookup Unread Message for one QOS > 0
backend.redis.hook.session.subscribed.1  = {"topic": "queue/#", "action": {"function": "on_message_fetch_for_queue"}, "pool": "pool1"}

## Lookup Unread Message for many QOS > 0
backend.redis.hook.session.subscribed.2  = {"topic": "pubsub/#", "action": {"function": "on_message_fetch_for_pubsub"}, "pool": "pool1"}

## Lookup Retain Message
backend.redis.hook.session.subscribed.3  = {"action": {"function": "on_retain_lookup"}, "pool": "pool1"}

## Store Publish Message  QOS > 0
backend.redis.hook.message.publish.1     = {"topic": "#", "action": {"function": "on_message_publish"}, "pool": "pool1"}

## Store Retain Message
backend.redis.hook.message.publish.2     = {"topic": "#", "action": {"function": "on_message_retain"}, "pool": "pool1"}

## Delete Retain Message
backend.redis.hook.message.publish.3     = {"topic": "#", "action": {"function": "on_retain_delete"}, "pool": "pool1"}

## Store Ack for one
backend.redis.hook.message.acked.1       = {"topic": "queue/#", "action": {"function": "on_message_acked_for_queue"}, "pool": "pool1"}

## Store Ack for many
backend.redis.hook.message.acked.2       = {"topic": "pubsub/#", "action": {"function": "on_message_acked_for_pubsub"}, "pool": "pool1"}

Description of Persistence Hooks

hook topic action/function Description
client.connected   on_client_connected Store client connected state
client.connected   on_subscribe_lookup Subscribe to topics
client.disconnected   on_client_disconnected Store the client disconnected state
session.subscribed queue/# on_message_fetch_for_queue Fetch one to one offline message
session.subscribed pubsub/# on_message_fetch_for_pubsub Fetch one to many offline message
session.subscribed # on_retain_lookup Lookup retained message
message.publish # on_message_publish Store the published messages
message.publish # on_message_retain Store retained messages
message.publish # on_retain_delete Delete retained messages
message.acked queue/# on_message_acked_for_queue Process ACK of one to one messages
message.acked pubsub/# on_message_acked_for_pubsub Process ACK of one to many messages

Redis Command Line Parameters

hook Parameter Example (Fields separated exactly by one space)
client.connected clientid SET conn:${clientid} clientid
client.disconnected clientid SET disconn:${clientid} clientid
session.subscribed clientid, topic, qos HSET sub:${clientid} topic qos
session.unsubscribed clientid, topic SET unsub:${clientid} topic
message.publish message, msgid, topic, payload, qos, clientid RPUSH pub:${topic} msgid
message.acked msgid, topic, clientid HSET ack:${clientid} topic msgid
message.delivered msgid, topic, clientid HSET delivered:${clientid} topic msgid

Configure 'action' with Redis Commands

Redis backend supports raw 'commands' in 'action', e.g.:

## After a client connected to the EMQ X server, it executes a redis command (multiple redis commands also supported)
backend.redis.hook.client.connected.3 = {"action": {"commands": ["SET conn:${clientid} clientid"]}, "pool": "pool1"}

Using Redis Hash for Devices' Connection State

mqtt:client Hash for devices' connection state:

hmset
key = mqtt:client:${clientid}
value = {state:int, online_at:timestamp, offline_at:timestamp}

hset
key = mqtt:node:${node}
field = ${clientid}
value = ${ts}

Lookup devices' connection state:

HGETALL "mqtt:client:${clientId}"

E.g.: Client with ClientId 'test' goes online:

HGETALL mqtt:client:test
1) "state"
2) "1"
3) "online_at"
4) "1481685802"
5) "offline_at"
6) "undefined"

Client with ClientId 'test' goes offline:

HGETALL mqtt:client:test
1) "state"
2) "0"
3) "online_at"
4) "1481685802"
5) "offline_at"
6) "1481685924"

Using Redis Hash for Retained Messages

mqtt:retain Hash for retained messages:

hmset
key = mqtt:retain:${topic}
value = {id: string, from: string, qos: int, topic: string, retain: int, payload: string, ts: timestamp}

Lookup retained message:

HGETALL "mqtt:retain:${topic}"

Lookup retained messages with a topic of 'retain':

HGETALL mqtt:retain:topic
 1) "id"
 2) "6P9NLcJ65VXBbC22sYb4"
 3) "from"
 4) "test"
 5) "qos"
 6) "1"
 7) "topic"
 8) "topic"
 9) "retain"
10) "true"
11) "payload"
12) "Hello world!"
13) "ts"
14) "1481690659"

Using Redis Hash for messages

mqtt:msg Hash for MQTT messages:

hmset
key = mqtt:msg:${msgid}
value = {id: string, from: string, qos: int, topic: string, retain: int, payload: string, ts: timestamp}

zadd
key = mqtt:msg:${topic}
field = 1
value = ${msgid}

Using Redis Set for Message Acknowledgements

mqtt:acked SET stores acknowledgements from the clients:

set
key = mqtt:acked:${clientid}:${topic}
value = ${msgid}

Using Redis Hash for Subscription

mqtt:sub Hash for Subscriptions:

hset
key = mqtt:sub:${clientid}
field = ${topic}
value = ${qos}

A client subscribes to a topic:

HSET mqtt:sub:${clientid} ${topic} ${qos}

A client with ClientId of 'test' subscribes to topic1 and topic2:

HSET "mqtt:sub:test" "topic1" 1
HSET "mqtt:sub:test" "topic2" 2

Lookup the subscribed topics of client with ClientId of 'test':

HGETALL mqtt:sub:test
1) "topic1"
2) "1"
3) "topic2"
4) "2"

Redis SUB/UNSUB Publish

When a device subscribes / unsubscribes to topics, EMQ X broker publish an event to the Redis:

PUBLISH
channel = "mqtt_channel"
message = {type: string , topic: string, clientid: string, qos: int}
\*type: [subscribe/unsubscribe]

client with ClientID 'test' subscribe to 'topic0':

PUBLISH "mqtt_channel" "{\"type\": \"subscribe\", \"topic\": \"topic0\", \"clientid\": \"test\", \"qos\": \"0\"}"

Client with ClientId 'test' unsubscribes to 'test_topic0':

PUBLISH "mqtt_channel" "{\"type\": \"unsubscribe\", \"topic\": \"test_topic0\", \"clientid\": \"test\"}"

Enable Redis Backend

./bin/emqx_ctl plugins load emqx_backend_redis

MySQL Backend

Config file: emqx_backend_mysql.conf

Configure MySQL Server

Connection pool of multiple MySQL servers is supported:

## Mysql Server
backend.mysql.pool1.server = 127.0.0.1:3306

## Mysql Pool Size
backend.mysql.pool1.pool_size = 8

## Mysql Username
backend.mysql.pool1.user = root

## Mysql Password
backend.mysql.pool1.password = public

## Mysql Database
backend.mysql.pool1.database = mqtt

## Max number of fetch offline messages. Without count limit if infinity
## backend.mysql.max_returned_count = 500

## Time Range. Without time limit if infinity
## d - day
## h - hour
## m - minute
## s - second
## backend.mysql.time_range = 2h

Configure MySQL Persistence Hooks

## Client Connected Record
backend.mysql.hook.client.connected.1    = {"action": {"function": "on_client_connected"}, "pool": "pool1"}

## Subscribe Lookup Record
backend.mysql.hook.client.connected.2    = {"action": {"function": "on_subscribe_lookup"}, "pool": "pool1"}

## Client DisConnected Record
backend.mysql.hook.client.disconnected.1 = {"action": {"function": "on_client_disconnected"}, "pool": "pool1"}

## Lookup Unread Message QOS > 0
backend.mysql.hook.session.subscribed.1  = {"topic": "#", "action": {"function": "on_message_fetch"}, "pool": "pool1"}

## Lookup Retain Message
backend.mysql.hook.session.subscribed.2  = {"topic": "#", "action": {"function": "on_retain_lookup"}, "pool": "pool1"}

## Store Publish Message  QOS > 0
backend.mysql.hook.message.publish.1     = {"topic": "#", "action": {"function": "on_message_publish"}, "pool": "pool1"}

## Store Retain Message
backend.mysql.hook.message.publish.2     = {"topic": "#", "action": {"function": "on_message_retain"}, "pool": "pool1"}

## Delete Retain Message
backend.mysql.hook.message.publish.3     = {"topic": "#", "action": {"function": "on_retain_delete"}, "pool": "pool1"}

## Store Ack
backend.mysql.hook.message.acked.1       = {"topic": "#", "action": {"function": "on_message_acked"}, "pool": "pool1"}

## Get offline messages
##  "offline_opts": Get configuration for offline messages
##  max_returned_count: Maximum number of offline messages get at a time
##  time_range: Get only messages in the current time range
## backend.mysql.hook.session.subscribed.1  = {"topic": "#", "action": {"function": "on_message_fetch"}, "offline_opts": {"max_returned_count": 500, "time_range": "2h"}, "pool": "pool1"}

## If you need to store Qos0 messages, you can enable the following configuration
## Warning: When the following configuration is enabled, 'on_message_fetch' needs to be disabled, otherwise qos1, qos2 messages will be stored twice
## backend.mysql.hook.message.publish.4     = {"topic": "#", "action": {"function": "on_message_store"}, "pool": "pool1"}

Description of MySQL Persistence Hooks

hook topic action Description
client.connected   on_client_connected Store client connected state
client.connected   on_subscribe_lookup Subscribed topics
client.disconnected   on_client_disconnected Store client disconnected state
session.subscribed # on_message_fetch Fetch offline messages
session.subscribed # on_retain_lookup Lookup retained messages
message.publish # on_message_publish Store published messages
message.publish # on_message_retain Store retained messages
message.publish # on_retain_delete Delete retained messages
message.acked # on_message_acked Process ACK

SQL Parameters Description

hook Parameters Example (${name} represents available parameter)
client.connected clientid insert into conn(clientid) values(${clientid})
client.disconnected clientid insert into disconn(clientid) values(${clientid})
session.subscribed clientid, topic, qos insert into sub(topic, qos) values(${topic}, ${qos})
session.unsubscribed clientid, topic delete from sub where topic = ${topic}
message.publish msgid, topic, payload, qos, clientid insert into msg(msgid, topic) values(${msgid}, ${topic})
message.acked msgid, topic, clientid insert into ack(msgid, topic) values(${msgid}, ${topic})
message.delivered msgid, topic, clientid insert into delivered(msgid, topic) values(${msgid}, ${topic})

Configure 'action' with SQL

MySQL backend supports SQL in 'action':

## After a client is connected to the EMQ X server, it executes a SQL command (multiple SQL commands also supported)
backend.mysql.hook.client.connected.3 = {"action": {"sql": ["insert into conn(clientid) values(${clientid})"]}, "pool": "pool1"}

Create MySQL DB

create database mqtt;

Import MySQL DB & Table Schema

mysql -u root -p mqtt < etc/sql/emqx_backend_mysql.sql

注解

DB name is free of choice

MySQL Client Connection Table

mqtt_client stores client connection states:

DROP TABLE IF EXISTS `mqtt_client`;
CREATE TABLE `mqtt_client` (
  `id` int(11) unsigned NOT NULL AUTO_INCREMENT,
  `clientid` varchar(64) DEFAULT NULL,
  `state` varchar(3) DEFAULT NULL,
  `node` varchar(100) DEFAULT NULL,
  `online_at` datetime DEFAULT NULL,
  `offline_at` datetime DEFAULT NULL,
  `created` timestamp NULL DEFAULT CURRENT_TIMESTAMP,
  PRIMARY KEY (`id`),
  KEY `mqtt_client_idx` (`clientid`),
  UNIQUE KEY `mqtt_client_key` (`clientid`)
) ENGINE=InnoDB DEFAULT CHARSET=utf8;

Query the client connection state:

select * from mqtt_client where clientid = ${clientid};

If client 'test' is online:

select * from mqtt_client where clientid = "test";

+----+----------+-------+----------------+---------------------+---------------------+---------------------+
| id | clientid | state | node           | online_at           | offline_at          | created             |
+----+----------+-------+----------------+---------------------+---------------------+---------------------+
|  1 | test     | 1     | emqx@127.0.0.1 | 2016-11-15 09:40:40 | NULL                | 2016-12-24 09:40:22 |
+----+----------+-------+----------------+---------------------+---------------------+---------------------+
1 rows in set (0.00 sec)

If client 'test' is offline:

select * from mqtt_client where clientid = "test";

+----+----------+-------+----------------+---------------------+---------------------+---------------------+
| id | clientid | state | node           | online_at           | offline_at          | created             |
+----+----------+-------+----------------+---------------------+---------------------+---------------------+
|  1 | test     | 0     | emqx@127.0.0.1 | 2016-11-15 09:40:40 | 2016-11-15 09:46:10 | 2016-12-24 09:40:22 |
+----+----------+-------+----------------+---------------------+---------------------+---------------------+
1 rows in set (0.00 sec)

MySQL Subscription Table

mqtt_sub table stores MQTT subscriptions of clients:

DROP TABLE IF EXISTS `mqtt_sub`;
CREATE TABLE `mqtt_sub` (
  `id` int(11) unsigned NOT NULL AUTO_INCREMENT,
  `clientid` varchar(64) DEFAULT NULL,
  `topic` varchar(255) DEFAULT NULL,
  `qos` int(3) DEFAULT NULL,
  `created` timestamp NULL DEFAULT CURRENT_TIMESTAMP,
  PRIMARY KEY (`id`),
  KEY `mqtt_sub_idx` (`clientid`,`topic`(255),`qos`),
  UNIQUE KEY `mqtt_sub_key` (`clientid`,`topic`)
) ENGINE=InnoDB DEFAULT CHARSET=utf8;

E.g., client 'test' subscribes to 'test_topic1' and 'test_topic2':

insert into mqtt_sub(clientid, topic, qos) values("test", "test_topic1", 1);
insert into mqtt_sub(clientid, topic, qos) values("test", "test_topic2", 2);

Query subscription of a client:

select * from mqtt_sub where clientid = ${clientid};

E.g., query the Subscription of client 'test':

select * from mqtt_sub where clientid = "test";

+----+--------------+-------------+------+---------------------+
| id | clientId     | topic       | qos  | created             |
+----+--------------+-------------+------+---------------------+
|  1 | test         | test_topic1 |    1 | 2016-12-24 17:09:05 |
|  2 | test         | test_topic2 |    2 | 2016-12-24 17:12:51 |
+----+--------------+-------------+------+---------------------+
2 rows in set (0.00 sec)

MySQL Message Table

mqtt_msg stores MQTT messages:

DROP TABLE IF EXISTS `mqtt_msg`;
CREATE TABLE `mqtt_msg` (
  `id` int(11) unsigned NOT NULL AUTO_INCREMENT,
  `msgid` varchar(100) DEFAULT NULL,
  `topic` varchar(1024) NOT NULL,
  `sender` varchar(1024) DEFAULT NULL,
  `node` varchar(60) DEFAULT NULL,
  `qos` int(11) NOT NULL DEFAULT '0',
  `retain` tinyint(2) DEFAULT NULL,
  `payload` blob,
  `arrived` datetime NOT NULL,
  PRIMARY KEY (`id`)
) ENGINE=InnoDB DEFAULT CHARSET=utf8;

Query messages published by a client:

select * from mqtt_msg where sender = ${clientid};

Query messages published by client 'test':

select * from mqtt_msg where sender = "test";

+----+-------------------------------+----------+--------+------+-----+--------+---------+---------------------+
| id | msgid                         | topic    | sender | node | qos | retain | payload | arrived             |
+----+-------------------------------+----------+--------+------+-----+--------+---------+---------------------+
| 1  | 53F98F80F66017005000004A60003 | hello    | test   | NULL |   1 |      0 | hello   | 2016-12-24 17:25:12 |
| 2  | 53F98F9FE42AD7005000004A60004 | world    | test   | NULL |   1 |      0 | world   | 2016-12-24 17:25:45 |
+----+-------------------------------+----------+--------+------+-----+--------+---------+---------------------+
2 rows in set (0.00 sec)

MySQL Retained Message Table

mqtt_retain stores retained messages:

DROP TABLE IF EXISTS `mqtt_retain`;
CREATE TABLE `mqtt_retain` (
  `id` int(11) unsigned NOT NULL AUTO_INCREMENT,
  `topic` varchar(200) DEFAULT NULL,
  `msgid` varchar(60) DEFAULT NULL,
  `sender` varchar(100) DEFAULT NULL,
  `node` varchar(100) DEFAULT NULL,
  `qos` int(2) DEFAULT NULL,
  `payload` blob,
  `arrived` timestamp NOT NULL DEFAULT CURRENT_TIMESTAMP,
  PRIMARY KEY (`id`),
  UNIQUE KEY `mqtt_retain_key` (`topic`)
) ENGINE=InnoDB DEFAULT CHARSET=utf8;

Query retained messages:

select * from mqtt_retain where topic = ${topic};

Query retained messages with topic 'retain':

select * from mqtt_retain where topic = "retain";

+----+----------+-------------------------------+---------+------+------+---------+---------------------+
| id | topic    | msgid                         | sender  | node | qos  | payload | arrived             |
+----+----------+-------------------------------+---------+------+------+---------+---------------------+
|  1 | retain   | 53F33F7E4741E7007000004B70001 | test    | NULL |    1 | www     | 2016-12-24 16:55:18 |
+----+----------+-------------------------------+---------+------+------+---------+---------------------+
1 rows in set (0.00 sec)

MySQL Acknowledgement Table

mqtt_acked stores acknowledgements from the clients:

DROP TABLE IF EXISTS `mqtt_acked`;
CREATE TABLE `mqtt_acked` (
  `id` int(11) unsigned NOT NULL AUTO_INCREMENT,
  `clientid` varchar(200) DEFAULT NULL,
  `topic` varchar(200) DEFAULT NULL,
  `mid` int(200) DEFAULT NULL,
  `created` timestamp NULL DEFAULT NULL,
  PRIMARY KEY (`id`),
  UNIQUE KEY `mqtt_acked_key` (`clientid`,`topic`)
) ENGINE=InnoDB DEFAULT CHARSET=utf8;

Enable MySQL Backend

./bin/emqx_ctl plugins load emqx_backend_mysql

PostgreSQL Backend

Config file: emqx_backend_pgsql.conf

Configure PostgreSQL Server

Connection pool of multiple PostgreSQL servers is supported:

## Pgsql Server
backend.pgsql.pool1.server = 127.0.0.1:5432

## Pgsql Pool Size
backend.pgsql.pool1.pool_size = 8

## Pgsql Username
backend.pgsql.pool1.username = root

## Pgsql Password
backend.pgsql.pool1.password = public

## Pgsql Database
backend.pgsql.pool1.database = mqtt

## Pgsql Ssl
backend.pgsql.pool1.ssl = false

## Max number of fetch offline messages. Without count limit if infinity
## backend.pgsql.max_returned_count = 500

## Time Range. Without time limit if infinity
## d - day
## h - hour
## m - minute
## s - second
## backend.pgsql.time_range = 2h

Configure PostgreSQL Persistence Hooks

## Client Connected Record
backend.pgsql.hook.client.connected.1    = {"action": {"function": "on_client_connected"}, "pool": "pool1"}

## Subscribe Lookup Record
backend.pgsql.hook.client.connected.2    = {"action": {"function": "on_subscribe_lookup"}, "pool": "pool1"}

## Client DisConnected Record
backend.pgsql.hook.client.disconnected.1 = {"action": {"function": "on_client_disconnected"}, "pool": "pool1"}

## Lookup Unread Message QOS > 0
backend.pgsql.hook.session.subscribed.1  = {"topic": "#", "action": {"function": "on_message_fetch"}, "pool": "pool1"}

## Lookup Retain Message
backend.pgsql.hook.session.subscribed.2  = {"topic": "#", "action": {"function": "on_retain_lookup"}, "pool": "pool1"}

## Store Publish Message  QOS > 0
backend.pgsql.hook.message.publish.1     = {"topic": "#", "action": {"function": "on_message_publish"}, "pool": "pool1"}

## Store Retain Message
backend.pgsql.hook.message.publish.2     = {"topic": "#", "action": {"function": "on_message_retain"}, "pool": "pool1"}

## Delete Retain Message
backend.pgsql.hook.message.publish.3     = {"topic": "#", "action": {"function": "on_retain_delete"}, "pool": "pool1"}

## Store Ack
backend.pgsql.hook.message.acked.1       = {"topic": "#", "action": {"function": "on_message_acked"}, "pool": "pool1"}

## Get offline messages
##  "offline_opts": Get configuration for offline messages
##  max_returned_count: Maximum number of offline messages get at a time
##  time_range: Get only messages in the current time range
## backend.pgsql.hook.session.subscribed.1  = {"topic": "#", "action": {"function": "on_message_fetch"}, "offline_opts": {"max_returned_count": 500, "time_range": "2h"}, "pool": "pool1"}

## If you need to store Qos0 messages, you can enable the following configuration
## Warning: When the following configuration is enabled, 'on_message_fetch' needs to be disabled, otherwise qos1, qos2 messages will be stored twice
## backend.pgsql.hook.message.publish.4     = {"topic": "#", "action": {"function": "on_message_store"}, "pool": "pool1"}

Description of PostgreSQL Persistence Hooks

hook topic action Description
client.connected   on_client_connected Store client connected state
client.connected   on_subscribe_lookup Subscribed topics
client.disconnected   on_client_disconnected Store client disconnected state
session.subscribed # on_message_fetch Fetch offline messages
session.subscribed # on_retain_lookup Lookup retained messages
message.publish # on_message_publish Store published messages
message.publish # on_message_retain Store retained messages
message.publish # on_retain_delete Delete retained messages
message.acked # on_message_acked Process ACK

SQL Parameters Description

hook Parameters Example (${name} represents available parameter)
client.connected clientid insert into conn(clientid) values(${clientid})
client.disconnected clientid insert into disconn(clientid) values(${clientid})
session.subscribed clientid, topic, qos insert into sub(topic, qos) values(${topic}, ${qos})
session.unsubscribed clientid, topic delete from sub where topic = ${topic}
message.publish msgid, topic, payload, qos, clientid insert into msg(msgid, topic) values(${msgid}, ${topic})
message.acked msgid, topic, clientid insert into ack(msgid, topic) values(${msgid}, ${topic})
message.delivered msgid, topic, clientid insert into delivered(msgid, topic) values(${msgid}, ${topic})

Configure 'action' with SQL

PostgreSQL backend supports SQL in 'action':

## After a client is connected to the EMQ X server, it executes a SQL command (multiple command also supported)
backend.pgsql.hook.client.connected.3 = {"action": {"sql": ["insert into conn(clientid) values(${clientid})"]}, "pool": "pool1"}

Create PostgreSQL DB

createdb mqtt -E UTF8 -e

Import PostgreSQL DB & Table Schema

\i etc/sql/emqx_backend_pgsql.sql

注解

DB name is free of choice

PostgreSQL Client Connection Table

mqtt_client stores client connection states:

CREATE TABLE mqtt_client(
  id SERIAL primary key,
  clientid character varying(100),
  state integer,
  node character varying(100),
  online_at timestamp,
  offline_at timestamp,
  created timestamp without time zone,
  UNIQUE (clientid)
);

Query a client's connection state:

select * from mqtt_client where clientid = ${clientid};

E.g., if client 'test' is online:

select * from mqtt_client where clientid = 'test';

 id | clientid | state | node             | online_at           | offline_at        | created
----+----------+-------+----------------+---------------------+---------------------+---------------------
  1 | test     | 1     | emqx@127.0.0.1 | 2016-11-15 09:40:40 | NULL                | 2016-12-24 09:40:22
(1 rows)

Client 'test' is offline:

select * from mqtt_client where clientid = 'test';

 id | clientid | state | nod            | online_at           | offline_at          | created
----+----------+-------+----------------+---------------------+---------------------+---------------------
  1 | test     | 0     | emqx@127.0.0.1 | 2016-11-15 09:40:40 | 2016-11-15 09:46:10 | 2016-12-24 09:40:22
(1 rows)

PostgreSQL Subscription Table

mqtt_sub stores subscriptions of clients:

CREATE TABLE mqtt_sub(
  id SERIAL primary key,
  clientid character varying(100),
  topic character varying(200),
  qos integer,
  created timestamp without time zone,
  UNIQUE (clientid, topic)
);

E.g., client 'test' subscribes to topic 'test_topic1' and 'test_topic2':

insert into mqtt_sub(clientid, topic, qos) values('test', 'test_topic1', 1);
insert into mqtt_sub(clientid, topic, qos) values('test', 'test_topic2', 2);

Query subscription of a client:

select * from mqtt_sub where clientid = ${clientid};

Query subscription of client 'test':

select * from mqtt_sub where clientid = 'test';

 id | clientId     | topic       | qos  | created
----+--------------+-------------+------+---------------------
  1 | test         | test_topic1 |    1 | 2016-12-24 17:09:05
  2 | test         | test_topic2 |    2 | 2016-12-24 17:12:51
(2 rows)

PostgreSQL Message Table

mqtt_msg stores MQTT messages:

CREATE TABLE mqtt_msg (
  id SERIAL primary key,
  msgid character varying(60),
  sender character varying(100),
  topic character varying(200),
  qos integer,
  retain integer,
  payload text,
  arrived timestamp without time zone
);

Query messages published by a client:

select * from mqtt_msg where sender = ${clientid};

Query messages published by client 'test':

select * from mqtt_msg where sender = 'test';

 id | msgid                         | topic    | sender | node | qos | retain | payload | arrived
----+-------------------------------+----------+--------+------+-----+--------+---------+---------------------
 1  | 53F98F80F66017005000004A60003 | hello    | test   | NULL |   1 |      0 | hello   | 2016-12-24 17:25:12
 2  | 53F98F9FE42AD7005000004A60004 | world    | test   | NULL |   1 |      0 | world   | 2016-12-24 17:25:45
(2 rows)

PostgreSQL Retained Message Table

mqtt_retain stores retained messages:

CREATE TABLE mqtt_retain(
  id SERIAL primary key,
  topic character varying(200),
  msgid character varying(60),
  sender character varying(100),
  qos integer,
  payload text,
  arrived timestamp without time zone,
  UNIQUE (topic)
);

Query retained messages:

select * from mqtt_retain where topic = ${topic};

Query retained messages with topic 'retain':

select * from mqtt_retain where topic = 'retain';

 id | topic    | msgid                         | sender  | node | qos  | payload | arrived
----+----------+-------------------------------+---------+------+------+---------+---------------------
  1 | retain   | 53F33F7E4741E7007000004B70001 | test    | NULL |    1 | www     | 2016-12-24 16:55:18
(1 rows)

PostgreSQL Acknowledgement Table

mqtt_acked stores acknowledgements from the clients:

CREATE TABLE mqtt_acked (
  id SERIAL primary key,
  clientid character varying(100),
  topic character varying(100),
  mid integer,
  created timestamp without time zone,
  UNIQUE (clientid, topic)
);

Enable PostgreSQL Backend

./bin/emqx_ctl plugins load emqx_backend_pgsql

MongoDB Backend

Config file: emqx_backend_mongo.conf

Configure MongoDB Server

Connection pool of multiple MongoDB servers is supported:

## MongoDB Server Pools
## Mongo Topology Type single|unknown|sharded|rs
backend.mongo.pool1.type = single

## If type rs, need config setname
## backend.mongo.pool1.rs_set_name = testrs

## Mongo Server 127.0.0.1:27017,127.0.0.2:27017...
backend.mongo.pool1.server = 127.0.0.1:27017

## MongoDB Pool Size
backend.mongo.pool1.c_pool_size = 8

## MongoDB Database
backend.mongo.pool1.database = mqtt

## Mongo User
## backend.mongo.pool1.login =  emqtt
## Mongo Password
## backend.mongo.pool1.password = emqtt

## MongoDB AuthSource
## Value: String
## Default: mqtt
## backend.mongo.pool1.auth_source = admin

## Whether to enable SSL connection.
##
## Value: true | false
## backend.mongo.pool1.ssl = false

## SSL keyfile.
##
## Value: File
## backend.mongo.pool1.keyfile =

## SSL certfile.
##
## Value: File
## backend.mongo.pool1.certfile =

## SSL cacertfile.
##
## Value: File
## backend.mongo.pool1.cacertfile =

# Value: unsafe | safe
## backend.mongo.pool1.w_mode = safe
## Value: master | slave_ok
## backend.mongo.pool1.r_mode = slave_ok

## Mongo Topology Options
## backend.mongo.topology.pool_size = 1
## backend.mongo.topology.max_overflow = 0
## backend.mongo.topology.overflow_ttl = 1000
## backend.mongo.topology.overflow_check_period = 1000
## backend.mongo.topology.local_threshold_ms = 1000
## backend.mongo.topology.connect_timeout_ms = 20000
## backend.mongo.topology.socket_timeout_ms = 100
## backend.mongo.topology.server_selection_timeout_ms = 30000
## backend.mongo.topology.wait_queue_timeout_ms = 1000
## backend.mongo.topology.heartbeat_frequency_ms = 10000
## backend.mongo.topology.min_heartbeat_frequency_ms = 1000
## Max number of fetch offline messages. Without count limit if infinity
## backend.mongo.max_returned_count = 500

## Time Range. Without time limit if infinity
## d - day
## h - hour
## m - minute
## s - second
## backend.mongo.time_range = 2h

Configure MongoDB Persistence Hooks

## Client Connected Record
backend.mongo.hook.client.connected.1    = {"action": {"function": "on_client_connected"}, "pool": "pool1"}

## Subscribe Lookup Record
backend.mongo.hook.client.connected.2    = {"action": {"function": "on_subscribe_lookup"}, "pool": "pool1"}

## Client DisConnected Record
backend.mongo.hook.client.disconnected.1 = {"action": {"function": "on_client_disconnected"}, "pool": "pool1"}

## Lookup Unread Message QOS > 0
backend.mongo.hook.session.subscribed.1  = {"topic": "#", "action": {"function": "on_message_fetch"}, "pool": "pool1"}

## Lookup Retain Message
backend.mongo.hook.session.subscribed.2  = {"topic": "#", "action": {"function": "on_retain_lookup"}, "pool": "pool1"}

## Store Publish Message  QOS > 0, payload_format options mongo_json | plain_text
backend.mongo.hook.message.publish.1     = {"topic": "#", "action": {"function": "on_message_publish"}, "pool": "pool1", "payload_format": "mongo_json"}

## Store Retain Message, payload_format options mongo_json | plain_text
backend.mongo.hook.message.publish.2     = {"topic": "#", "action": {"function": "on_message_retain"}, "pool": "pool1", "payload_format": "mongo_json"}

## Delete Retain Message
backend.mongo.hook.message.publish.3     = {"topic": "#", "action": {"function": "on_retain_delete"}, "pool": "pool1"}

## Store Ack
backend.mongo.hook.message.acked.1       = {"topic": "#", "action": {"function": "on_message_acked"}, "pool": "pool1"}

## Get offline messages
##  "offline_opts": Get configuration for offline messages
##  max_returned_count: Maximum number of offline messages get at a time
##  time_range: Get only messages in the current time range
## backend.mongo.hook.session.subscribed.1  = {"topic": "#", "action": {"function": "on_message_fetch"}, "offline_opts": {"max_returned_count": 500, "time_range": "2h"}, "pool": "pool1"}

## If you need to store Qos0 messages, you can enable the following configuration
## Warning: When the following configuration is enabled, 'on_message_fetch' needs to be disabled, otherwise qos1, qos2 messages will be stored twice
## backend.mongo.hook.message.publish.4     = {"topic": "#", "action": {"function": "on_message_store"}, "pool": "pool1"}

Description of MongoDB Persistence Hooks

hook topic action Description
client.connected   on_client_connected Store client connected state
client.connected   on_subscribe_lookup Subscribed topics
client.disconnected   on_client_disconnected Store client disconnected state
session.subscribed # on_message_fetch Fetch offline messages
session.subscribed # on_retain_lookup Lookup retained messages
message.publish # on_message_publish Store published messages
message.publish # on_message_retain Store retained messages
message.publish # on_retain_delete Delete retained messages
message.acked # on_message_acked Process ACK

Create MongoDB DB & Collections

use mqtt
db.createCollection("mqtt_client")
db.createCollection("mqtt_sub")
db.createCollection("mqtt_msg")
db.createCollection("mqtt_retain")
db.createCollection("mqtt_acked")

db.mqtt_client.ensureIndex({clientid:1, node:2})
db.mqtt_sub.ensureIndex({clientid:1})
db.mqtt_msg.ensureIndex({sender:1, topic:2})
db.mqtt_retain.ensureIndex({topic:1})

注解

DB name is free of choice

MongoDB MQTT Client Collection

mqtt_client stores MQTT clients' connection states:

{
    clientid: string,
    state: 0,1, //0 disconnected 1 connected
    node: string,
    online_at: timestamp,
    offline_at: timestamp
}

Query client's connection state:

db.mqtt_client.findOne({clientid: ${clientid}})

E.g., if client 'test' is online:

db.mqtt_client.findOne({clientid: "test"})

{
    "_id" : ObjectId("58646c9bdde89a9fb9f7fb73"),
    "clientid" : "test",
    "state" : 1,
    "node" : "emqx@127.0.0.1",
    "online_at" : 1482976411,
    "offline_at" : null
}

Client 'test' is offline:

db.mqtt_client.findOne({clientid: "test"})

{
    "_id" : ObjectId("58646c9bdde89a9fb9f7fb73"),
    "clientid" : "test",
    "state" : 0,
    "node" : "emq@127.0.0.1",
    "online_at" : 1482976411,
    "offline_at" : 1482976501
}

MongoDB Subscription Collection

mqtt_sub stores subscriptions of clients:

{
    clientid: string,
    topic: string,
    qos: 0,1,2
}

E.g., client 'test' subscribes to topic 'test_topic1' and 'test_topic2':

db.mqtt_sub.insert({clientid: "test", topic: "test_topic1", qos: 1})
db.mqtt_sub.insert({clientid: "test", topic: "test_topic2", qos: 2})

Query subscription of client 'test':

db.mqtt_sub.find({clientid: "test"})

{ "_id" : ObjectId("58646d90c65dff6ac9668ca1"), "clientid" : "test", "topic" : "test_topic1", "qos" : 1 }
{ "_id" : ObjectId("58646d96c65dff6ac9668ca2"), "clientid" : "test", "topic" : "test_topic2", "qos" : 2 }

MongoDB Message Collection

mqtt_msg stores MQTT messages:

{
    _id: int,
    topic: string,
    msgid: string,
    sender: string,
    qos: 0,1,2,
    retain: boolean (true, false),
    payload: string,
    arrived: timestamp
}

Query messages published by a client:

db.mqtt_msg.find({sender: ${clientid}})

Query messages published by client 'test':

db.mqtt_msg.find({sender: "test"})
{
    "_id" : 1,
    "topic" : "/World",
    "msgid" : "AAVEwm0la4RufgAABeIAAQ==",
    "sender" : "test",
    "qos" : 1,
    "retain" : 1,
    "payload" : "Hello world!",
    "arrived" : 1482976729
}

MongoDB Retained Message Collection

mqtt_retain stores retained messages:

{
    topic: string,
    msgid: string,
    sender: string,
    qos: 0,1,2,
    payload: string,
    arrived: timestamp
}

Query retained messages:

db.mqtt_retain.findOne({topic: ${topic}})

Query retained messages with topic 'retain':

db.mqtt_retain.findOne({topic: "/World"})
{
    "_id" : ObjectId("58646dd9dde89a9fb9f7fb75"),
    "topic" : "/World",
    "msgid" : "AAVEwm0la4RufgAABeIAAQ==",
    "sender" : "c1",
    "qos" : 1,
    "payload" : "Hello world!",
    "arrived" : 1482976729
}

MongoDB Acknowledgement Collection

mqtt_acked stores acknowledgements from the clients:

{
    clientid: string,
    topic: string,
    mongo_id: int
}

Enable MongoDB Backend

./bin/emqx_ctl plugins load emqx_backend_mongo

Cassandra Backend

Config file: etc/plugins/emqx_backend_cassa.conf

Configure Cassandra Cluster

Multi node Cassandra cluster is supported:

## Cassandra Node
backend.ecql.pool1.nodes = 127.0.0.1:9042

## Cassandra Pool Size
backend.ecql.pool1.size = 8

## Cassandra auto reconnect flag
backend.ecql.pool1.auto_reconnect = 1

## Cassandra Username
backend.ecql.pool1.username = cassandra

## Cassandra Password
backend.ecql.pool1.password = cassandra

## Cassandra Keyspace
backend.ecql.pool1.keyspace = mqtt

## Cassandra Logger type
backend.ecql.pool1.logger = info

## Max number of fetch offline messages. Without count limit if infinity
## backend.cassa.max_returned_count = 500

## Time Range. Without time limit if infinity
## d - day
## h - hour
## m - minute
## s - second
## backend.cassa.time_range = 2h

Configure Cassandra Persistence Hooks

## Client Connected Record
backend.cassa.hook.client.connected.1    = {"action": {"function": "on_client_connected"}, "pool": "pool1"}

## Subscribe Lookup Record
backend.cassa.hook.client.connected.2    = {"action": {"function": "on_subscription_lookup"}, "pool": "pool1"}

## Client DisConnected Record
backend.cassa.hook.client.disconnected.1 = {"action": {"function": "on_client_disconnected"}, "pool": "pool1"}

## Lookup Unread Message QOS > 0
backend.cassa.hook.session.subscribed.1  = {"topic": "#", "action": {"function": "on_message_fetch"}, "pool": "pool1"}

## Lookup Retain Message
backend.cassa.hook.session.subscribed.2  = {"action": {"function": "on_retain_lookup"}, "pool": "pool1"}

## Store Publish Message  QOS > 0
backend.cassa.hook.message.publish.1     = {"topic": "#", "action": {"function": "on_message_publish"}, "pool": "pool1"}

## Delete Acked Record
backend.cassa.hook.session.unsubscribed.1= {"topic": "#", action": {"cql": ["delete from acked where client_id = ${clientid} and topic = ${topic}"]}, "pool": "pool1"}

## Store Retain Message
backend.cassa.hook.message.publish.2     = {"topic": "#", "action": {"function": "on_message_retain"}, "pool": "pool1"}

## Delete Retain Message
backend.cassa.hook.message.publish.3     = {"topic": "#", "action": {"function": "on_retain_delete"}, "pool": "pool1"}

## Store Ack
backend.cassa.hook.message.acked.1       = {"topic": "#", "action": {"function": "on_message_acked"}, "pool": "pool1"}

## Get offline messages
##  "offline_opts": Get configuration for offline messages
##  max_returned_count: Maximum number of offline messages get at a time
##  time_range: Get only messages in the current time range
## backend.cassa.hook.session.subscribed.1  = {"topic": "#", "action": {"function": "on_message_fetch"}, "offline_opts": {"max_returned_count": 500, "time_range": "2h"}, "pool": "pool1"}

## If you need to store Qos0 messages, you can enable the following configuration
## Warning: When the following configuration is enabled, 'on_message_fetch' needs to be disabled, otherwise qos1, qos2 messages will be stored twice
## backend.cassa.hook.message.publish.4     = {"topic": "#", "action": {"function": "on_message_store"}, "pool": "pool1"}

Description of Cassandra Persistence Hooks

hook topic action Description
client.connected   on_client_connected Store client connected state
client.connected   on_subscribe_lookup Subscribed topics
client.disconnected   on_client_disconnected Store client disconnected state
session.subscribed # on_message_fetch Fetch offline messages
session.subscribed # on_retain_lookup Lookup retained messages
message.publish # on_message_publish Store published messages
message.publish # on_message_retain Store retained messages
message.publish # on_retain_delete Delete retained messages
message.acked # on_message_acked Process ACK

CQL Parameters Description

Customized CQL command parameters includes:

hook Parameter Example (${name} in CQL represents available parameter
client.connected clientid insert into conn(clientid) values(${clientid})
client.disconnected clientid insert into disconn(clientid) values(${clientid})
session.subscribed clientid, topic, qos insert into sub(topic, qos) values(${topic}, ${qos})
session.unsubscribed clientid, topic delete from sub where topic = ${topic}
message.publish msgid, topic, payload, qos, clientid insert into msg(msgid, topic) values(${msgid}, ${topic})
message.acked msgid, topic, clientid insert into ack(msgid, topic) values(${msgid}, ${topic})
message.delivered msgid, topic, clientid insert into delivered(msgid, topic) values(${msgid}, ${topic})

Configure 'action' with CQL

Cassandra backend supports CLQ in 'action':

## After a client is connected to the EMQ X server, it executes a CQL command(multiple command also supported):
backend.cassa.hook.client.connected.3 = {"action": {"cql": ["insert into conn(clientid) values(${clientid})"]}, "pool": "pool1"}

Initializing Cassandra

Create KeySpace:

CREATE KEYSPACE mqtt WITH REPLICATION = { 'class' : 'SimpleStrategy', 'replication_factor' : 1 };
USR mqtt;

Import Cassandra tables:

cqlsh -e "SOURCE 'emqx_backend_cassa.cql'"

注解

KeySpace is free of choice

Cassandra Client Connection Table

mqtt.client stores client connection states:

CREATE TABLE mqtt.client (
    client_id text,
    node text,
    state int,
    connected timestamp,
    disconnected timestamp,
    PRIMARY KEY(client_id)
);

Query a client's connection state:

select * from mqtt.client where clientid = ${clientid};

If client 'test' is online:

select * from mqtt.client where clientid = 'test';

 client_id | connected                       | disconnected  | node          | state
-----------+---------------------------------+---------------+---------------+-------
      test | 2017-02-14 08:27:29.872000+0000 |          null | emqx@127.0.0.1|     1

Client 'test' is offline:

select * from mqtt.client where clientid = 'test';

 client_id | connected                       | disconnected                    | node          | state
-----------+---------------------------------+---------------------------------+---------------+-------
      test | 2017-02-14 08:27:29.872000+0000 | 2017-02-14 08:27:35.872000+0000 | emqx@127.0.0.1|     0

Cassandra Subscription Table

mqtt.sub stores subscriptions of clients:

CREATE TABLE mqtt.sub (
    client_id text,
    topic text,
    qos int,
    PRIMARY KEY(client_id, topic)
);

Client 'test' subscribes to topic 'test_topic1' and 'test_topic2':

insert into mqtt.sub(client_id, topic, qos) values('test', 'test_topic1', 1);
insert into mqtt.sub(client_id, topic, qos) values('test', 'test_topic2', 2);

Query subscriptions of a client:

select * from mqtt_sub where clientid = ${clientid};

Query subscriptions of client 'test':

select * from mqtt_sub where clientid = 'test';

 client_id | topic       | qos
-----------+-------------+-----
      test | test_topic1 |   1
      test | test_topic2 |   2

Cassandra Message Table

mqtt.msg stores MQTT messages:

CREATE TABLE mqtt.msg (
    topic text,
    msgid text,
    sender text,
    qos int,
    retain int,
    payload text,
    arrived timestamp,
    PRIMARY KEY(topic, msgid)
  ) WITH CLUSTERING ORDER BY (msgid DESC);

Query messages published by a client:

select * from mqtt_msg where sender = ${clientid};

Query messages published by client 'test':

select * from mqtt_msg where sender = 'test';

 topic | msgid                | arrived                         | payload      | qos | retain | sender
-------+----------------------+---------------------------------+--------------+-----+--------+--------
 hello | 2PguFrHsrzEvIIBdctmb | 2017-02-14 09:07:13.785000+0000 | Hello world! |   1 |      0 |   test
 world | 2PguFrHsrzEvIIBdctmb | 2017-02-14 09:07:13.785000+0000 | Hello world! |   1 |      0 |   test

Cassandra Retained Message Table

mqtt.retain stores retained messages:

CREATE TABLE mqtt.retain (
    topic text,
    msgid text,
    PRIMARY KEY(topic)
);

Query retained messages:

select * from mqtt_retain where topic = ${topic};

Query retained messages with topic 'retain':

select * from mqtt_retain where topic = 'retain';

 topic  | msgid
--------+----------------------
 retain | 2PguFrHsrzEvIIBdctmb

Cassandra Acknowledgement Table

mqtt.acked stores acknowledgements from the clients:

CREATE TABLE mqtt.acked (
    client_id text,
    topic text,
    msgid text,
    PRIMARY KEY(client_id, topic)
  );

Enable Cassandra Backend

./bin/emqx_ctl plugins load emqx_backend_cassa

DynamoDB Backend

Configure DynamoDB Cluster

Config file: etc/plugins/emqx_backend_dynamo.conf

## DynamoDB Region
backend.dynamo.region = us-west-2

## DynamoDB Server
backend.dynamo.pool1.server = http://localhost:8000

## DynamoDB Pool Size
backend.dynamo.pool1.pool_size = 8

## AWS ACCESS KEY ID
backend.dynamo.pool1.aws_access_key_id = AKIAU5IM2XOC7AQWG7HK

## AWS SECRET ACCESS KEY
backend.dynamo.pool1.aws_secret_access_key = TZt7XoRi+vtCJYQ9YsAinh19jR1rngm/hxZMWR2P

## DynamoDB Backend Hooks
backend.dynamo.hook.client.connected.1    = {"action": {"function": "on_client_connected"}, "pool": "pool1"}
backend.dynamo.hook.session.created.1     = {"action": {"function": "on_subscribe_lookup"}, "pool": "pool1"}
backend.dynamo.hook.client.disconnected.1 = {"action": {"function": "on_client_disconnected"}, "pool": "pool1"}
backend.dynamo.hook.session.subscribed.1  = {"topic": "#", "action": {"function": "on_message_fetch_for_queue"}, "pool": "pool1"}
backend.dynamo.hook.session.subscribed.2  = {"topic": "#", "action": {"function": "on_retain_lookup"}, "pool": "pool1"}
backend.dynamo.hook.session.unsubscribed.1= {"topic": "#", "action": {"function": "on_acked_delete"}, "pool": "pool1"}
backend.dynamo.hook.message.publish.1     = {"topic": "#", "action": {"function": "on_message_publish"}, "pool": "pool1"}
backend.dynamo.hook.message.publish.2     = {"topic": "#", "action": {"function": "on_message_retain"}, "pool": "pool1"}
backend.dynamo.hook.message.publish.3     = {"topic": "#", "action": {"function": "on_retain_delete"}, "pool": "pool1"}
backend.dynamo.hook.message.acked.1       = {"topic": "#", "action": {"function": "on_message_acked_for_queue"}, "pool": "pool1"}

# backend.dynamo.hook.message.publish.4   = {"topic": "#", "action": {"function": "on_message_store"}, "pool": "pool1"}

Description of DynamoDB Persistence Hooks

hook topic action Description
client.connected   on_client_connected Store client connected state
client.connected   on_subscribe_lookup Subscribed topics
client.disconnected   on_client_disconnected Store client disconnected state
session.subscribed # on_message_fetch_for_queue Fetch offline messages
session.subscribed # on_retain_lookup Lookup retained messages
message.publish # on_message_publish Store published messages
message.publish # on_message_retain Store retained messages
message.publish # on_retain_delete Delete retained messages
message.acked # on_message_acked_for_queue Process ACK

Create DynamoDB DB

./test/dynamo_test.sh

注解

DB name is free of choice

DynamoDB Client Connection Table

mqtt_client stores client connection states:

{
    "TableName": "mqtt_client",
    "KeySchema": [
        { "AttributeName": "clientid", "KeyType": "HASH" }
    ],
    "AttributeDefinitions": [
        { "AttributeName": "clientid", "AttributeType": "S" }
    ],
    "ProvisionedThroughput": {
        "ReadCapacityUnits": 5,
        "WriteCapacityUnits": 5
    }
}

Query the client connection state:

aws dynamodb scan --table-name mqtt_client --region us-west-2  --endpoint-url http://localhost:8000

{
    "Items": [
        {
            "offline_at": { "N": "0" },
            "node": { "S": "emqx@127.0.0.1" },
            "clientid": { "S": "mqttjs_384b9c73a9" },
            "connect_state": { "N": "1" },
            "online_at": { "N": "1562224940" }
        }
    ],
    "Count": 1,
    "ScannedCount": 1,
    "ConsumedCapacity": null
}

DynamoDB Subscription Table

mqtt_sub table stores MQTT subscriptions of clients:

{
    "TableName": "mqtt_sub",
    "KeySchema": [
        { "AttributeName": "clientid", "KeyType": "HASH" },
        { "AttributeName": "topic", "KeyType": "RANGE" }
    ],
    "AttributeDefinitions": [
        { "AttributeName": "clientid", "AttributeType": "S" },
        { "AttributeName": "topic", "AttributeType": "S" }
    ],
    "ProvisionedThroughput": {
        "ReadCapacityUnits": 5,
        "WriteCapacityUnits": 5
    }
}

Query topics subscribed by the client named "test-dynamo":

aws dynamodb scan --table-name mqtt_sub --region us-west-2  --endpoint-url http://localhost:8000

{
    "Items": [{"qos": { "N": "2" }, "topic": { "S": "test-dynamo-sub" }, "clientid": { "S": "test-dynamo" }},
               {"qos": { "N": "2" }, "topic": { "S": "test-dynamo-sub-1"}, "clientid": { "S": "test-dynamo" }},
               {"qos": { "N": "2" }, "topic": { "S": "test-dynamo-sub-2"}, "clientid": { "S": "test-dynamo" }}],
    "Count": 3,
    "ScannedCount": 3,
    "ConsumedCapacity": null
}

DynamoDB Message Table

mqtt_msg stores MQTT messages:

{
    "TableName": "mqtt_msg",
    "KeySchema": [
        { "AttributeName": "msgid", "KeyType": "HASH" }
    ],
    "AttributeDefinitions": [
        { "AttributeName": "msgid", "AttributeType": "S" }
    ],
    "ProvisionedThroughput": {
        "ReadCapacityUnits": 5,
        "WriteCapacityUnits": 5
    }
}

mqtt_topic_msg_map stores the mapping between topics and messages:

{
    "TableName": "mqtt_topic_msg_map",
    "KeySchema": [
        { "AttributeName": "topic", "KeyType": "HASH" }
    ],
    "AttributeDefinitions": [
        { "AttributeName": "topic", "AttributeType": "S" }
    ],
    "ProvisionedThroughput": {
        "ReadCapacityUnits": 5,
        "WriteCapacityUnits": 5
    }
}

Query mqtt_msg and mqtt_topic_msg_map after a client publishes a message to the "test" topic:

Query mqtt_msg:

 aws dynamodb scan --table-name mqtt_msg --region us-west-2  --endpoint-url http://localhost:8000

{
    "Items": [
        {
             "arrived": { "N": "1562308553" },
             "qos": { "N": "1" },
             "sender": { "S": "mqttjs_231b962d5c" },
             "payload": { "S": "{ \"msg\": \"Hello, World!\" }"},
             "retain": { "N": "0" },
             "msgid": { "S": "Mjg4MTk1MDYwNTk0NjYwNzYzMTg4MDk3OTQ2MDU2Nzg1OTD" },
             "topic": { "S": "test" }
        }
    ],
    "Count": 1,
    "ScannedCount": 1,
    "ConsumedCapacity": null
}

Query mqtt_topic_msg_map:

 aws dynamodb scan --table-name mqtt_topic_msg_map --region us-west-2  --endpoint-url http://localhost:8000

{
    "Items": [
        {
             "topic": { "S": "test" },
             "MsgId": { "SS": [ "Mjg4MTk1MDYwNTk0NjYwNzYzMTg4MDk3OTQ2MDU2Nzg1OTD" ]}
        }
    ],
    "Count": 1,
    "ScannedCount": 1,
    "ConsumedCapacity": null
}

DynamoDB Retained Message Table

mqtt_retain stores retained messages:

{
    "TableName": "mqtt_retain",
    "KeySchema": [
        { "AttributeName": "topic", "KeyType": "HASH" }
    ],
    "AttributeDefinitions": [
        { "AttributeName": "topic", "AttributeType": "S" }
    ],
    "ProvisionedThroughput": {
        "ReadCapacityUnits": 5,
        "WriteCapacityUnits": 5
    }
}

Query mqtt_retain after a client publishes a message to the "test" topic:

{
    "Items": [
        {
            "arrived": { "N": "1562312113" },
            "qos": { "N": "1" },
            "sender": { "S": "mqttjs_d0513acfce" },
            "payload": { "S": "test" },
            "retain": { "N": "1" },
            "msgid": { "S": "Mjg4MTk1NzE3MTY4MjYxMjA5MDExMDg0NTk5ODgzMjAyNTH" },
            "topic": { "S": "testtopic" }
        }
    ],
    "Count": 1,
    "ScannedCount": 1,
    "ConsumedCapacity": null
}

DynamoDB Acknowledgement Table

mqtt_acked stores acknowledgements from the clients:

{
    "TableName": "mqtt_acked",
    "KeySchema": [
        { "AttributeName": "topic", "KeyType": "HASH" },
        { "AttributeName": "clientid", "KeyType": "RANGE" }
    ],
    "AttributeDefinitions": [
        { "AttributeName": "topic", "AttributeType": "S" },
        { "AttributeName": "clientid", "AttributeType": "S" }
    ],
    "ProvisionedThroughput": {
        "ReadCapacityUnits": 5,
        "WriteCapacityUnits": 5
    }
}

Query mqtt_acked after a client publishes a message to the "test" topic:

{
    "Items": [
        {
            "topic": { "S": "test" },
            "msgid": { "S": "Mjg4MTk1MDYwNTk0NjYwNzYzMTg4MDk3OTQ2MDU2Nzg1OTD" },
            "clientid": { "S": "mqttjs_861e582a70" }
        }
    ],
    "Count": 1,
    "ScannedCount": 1,
    "ConsumedCapacity": null
}

Enable DynamoDB Backend

./bin/emqx_ctl plugins load emqx_backend_dynamo

InfluxDB Backend

Configure InfluxDB Server

Config file: etc/plugins/emqx_backend_influxdb.conf:

## InfluxDB UDP Server
backend.influxdb.pool1.server = 127.0.0.1:8089

## InfluxDB Pool Size
backend.influxdb.pool1.pool_size = 5

## Wether to add timestamp automatically
backend.influxdb.pool1.set_timestamp = true

backend.influxdb.hook.message.publish.1 = {"topic": "#", "action": {"function": "on_message_publish"}, "pool": "pool1"}

Parameters in hook rule:

Option Description
topic Configure which topics need to execute hooks
action Configure specific action for hook, function is a built-in function provided as Backend for general functions
pool Pool Name, used to connect multiple InfluxDB servers

Example:

## Store PUBLISH message whose topic is "sensor/#"
backend.influxdb.hook.message.publish.1 = {"topic": "sensor/#", "action": {"function": "on_message_publish"}, "pool": "pool1"}

## Store PUBLISH message whose topic is "stat/#"
backend.influxdb.hook.message.publish.2 = {"topic": "stat/#", "action": {"function": "on_message_publish"}, "pool": "pool1"}

Description of InfluxDB Persistence Hooks

hook topic action Description
message.publish # on_message_publish Store published messages

Since MQTT Message cannot be written directly to InfluxDB, InfluxDB Backend provides an emqx_backend_influxdb.tmpl template file to convert MQTT Message to DataPoint that can be written to InfluxDB.

Template file use Json format:

  • key - MQTT Topic, Json String, support wildcard characters
  • value - Template, Json Object, used to convert MQTT Message into measurement,tag_key=tag_value,... field_key=field_value,... timestamp and write to InfluxDB。

You can define different templates for different topics or multiple templates for the same topic, likes:

{
    <Topic 1>: <Template 1>,
    <Topic 2>: <Template 2>
}

Template format:

{
    "measurement": <Measurement>,
    "tags": {
        <Tag Key>: <Tag Value>
    },
    "fields": {
        <Field Key>: <Field Value>
    },
    "timestamp": <Timestamp>
}

measurement and fields are required options, tags and timestamp are optional.

All values (such as <Measurement>) can be configured directly in the template as a fixed value that data types supported depending on the table you define. More realistically, of course, you can access the data in the MQTT message through the placeholder we provide.

Currently, we support placeholders as follows:

Placeholder Description
$id MQTT Message UUID, assigned by EMQ X
$clientid Client ID used by the Client
$username Username used by the Client
$peerhost IP of Client
$qos QoS of MQTT Message
$topic Topic of MQTT Message
$payload Payload of MQTT Message, must be valid Json data
$<Number> It must be used with $paylaod to retrieve data from Json Array
$timestamp The timestamp EMQ X sets when preparing to forward messages, precision: Nanoseconds

$payload and $<Number>:

You can directly use $content to obtain the complete message payload, you can use ["$payload", <Key>, ...] to get the data inside the message payload.

For example payload is {"data": {"temperature": 23.9}}, you can via ["$payload", "data", "temperature"] to get 23.9.

In the case of array data type in Json, we introduced $0 and $<pos_integer>, $0 means to get all elements in the array, and $<pos_integer> means to get the <pos_integer>th element in the array.

A simple example, ["$payload", "$0", "temp"] will get [20, 21] from [{"temp": 20}, {"temp": 21}], and ["$payload", "$1", "temp"] will only get 20.

It is worth noting that when you use $0, we expect the number of data you get is same. Because we need to convert these arrays into multiple records and write it into InfluxDB, and when you have three pieces of data in one field and two in another, we won't know how to combine the data for you.

Example

data/templates directory provides a sample template (emqx_backend_influxdb_example.tmpl, please remove the "_example" suffix from the filename when using it formally) for the user's reference:

{
    "sample": {
        "measurement": "$topic",
        "tags": {
            "host": ["$payload", "data", "$0", "host"],
            "region": ["$payload", "data", "$0", "region"],
            "qos": "$qos",
            "clientid": "$clientid"
        },
        "fields": {
            "temperature": ["$payload", "data", "$0", "temp"]
        },
        "timestamp": "$timestamp"
    }
}

When an MQTT Message whose Topic is "sample" has the following Payload:

{
    "data": [
        {
            "temp": 1,
            "host": "serverA",
            "region": "hangzhou"
        },
        {
            "temp": 2,
            "host": "serverB",
            "region": "ningbo"
        }
    ]
}

Backend converts MQTT messages to:

[
    {
        "measurement": "sample",
        "tags": {
            "clientid": "mqttjs_ebcc36079a",
            "host": "serverA",
            "qos": "0",
            "region": "hangzhou",
        },
        "fields": {
            "temperature": "1"
        },
        "timestamp": "1560743513626681000"
    },
    {
        "measurement": "sample",
        "tags": {
            "clientid": "mqttjs_ebcc36079a",
            "host": "serverB",
            "qos": "0",
            "region": "ningbo",
        },
        "fields": {
            "temperature": "2"
        },
        "timestamp": "1560743513626681000"
    }
]

The data was finally encoded and written to InfluxDB as follows:

"sample,clientid=mqttjs_6990f0e886,host=serverA,qos=0,region=hangzhou temperature=\"1\" 1560745505429670000\nsample,clientid=mqttjs_6990f0e886,host=serverB,qos=0,region=ningbo temperature=\"2\" 1560745505429670000\n"

Enable InfluxDB Backend

./bin/emqx_ctl plugins load emqx_backend_influxdb

OpenTSDB Backend

Configure OpenTSDB Server

Config file: etc/plugins/emqx_backend_opentsdb.conf:

## OpenTSDB Server
backend.opentsdb.pool1.server = 127.0.0.1:4242

## OpenTSDB Pool Size
backend.opentsdb.pool1.pool_size = 8

## Whether to return summary info
backend.opentsdb.pool1.summary = true

## Whether to return detailed info
##
## Value: true | false
backend.opentsdb.pool1.details = false

## Synchronous write or not
##
## Value: true | false
backend.opentsdb.pool1.sync = false

## Synchronous write timeout in milliseconds
##
## Value: Duration
##
## Default: 0
backend.opentsdb.pool1.sync_timeout = 0

## Max batch size
##
## Value: Number >= 0
## Default: 20
backend.opentsdb.pool1.max_batch_size = 20

## Store PUBLISH Messages
backend.opentsdb.hook.message.publish.1 = {"topic": "#", "action": {"function": "on_message_publish"}, "pool": "pool1"}

Parameters in hook rule:

Option Description
topic Configure which topics need to execute hooks
action Configure specific action for hook, function is a built-in function provided as Backend for general functions
pool Pool Name, used to connect multiple OpenTSDB servers

Example:

## Store PUBLISH message whose topic is "sensor/#"
backend.influxdb.hook.message.publish.1 = {"topic": "sensor/#", "action": {"function": "on_message_publish"}, "pool": "pool1"}

## Store PUBLISH message whose topic is "stat/#"
backend.influxdb.hook.message.publish.2 = {"topic": "stat/#", "action": {"function": "on_message_publish"}, "pool": "pool1"}

Description of OpenTSDB Persistence Hooks

hook topic action Description
message.publish # on_message_publish Store published messages

Since MQTT Message cannot be written directly to OpenTSDB, OpenTSDB Backend provides an emqx_backend_opentsdb.tmpl template file to convert MQTT Message to DataPoint that can be written to OpenTSDB.

Template file use Json format:

  • key - MQTT Topic, Json String, support wildcard characters
  • value - Template, Json Object, used to convert MQTT Message into measurement,tag_key=tag_value,... field_key=field_value,... timestamp and write to InfluxDB。

You can define different templates for different topics or multiple templates for the same topic, likes:

{
    <Topic 1>: <Template 1>,
    <Topic 2>: <Template 2>
}

The template format is as follows:

{
    "measurement": <Measurement>,
    "tags": {
        <Tag Key>: <Tag Value>
    },
    "value": <Value>,
    "timestamp": <Timestamp>
}

measurement and value are required options, tags and timestamp are optional.

All values (such as <Measurement>) can be configured directly in the template as a fixed value that data types supported depending on the table you define. More realistically, of course, you can access the data in the MQTT message through the placeholder we provide.

Currently, we support placeholders as follows:

Placeholder Description
$id MQTT Message UUID, assigned by EMQ X
$clientid Client ID used by the Client
$username Username used by the Client
$peerhost IP of Client
$qos QoS of MQTT Message
$topic Topic of MQTT Message
$payload Payload of MQTT Message, must be valid Json data
$<Number> It must be used with $paylaod to retrieve data from Json Array
$timestamp The timestamp EMQ X sets when preparing to forward messages, precision: Nanoseconds

$payload and $<Number>:

You can directly use $content to obtain the complete message payload, you can use ["$payload", <Key>, ...] to get the data inside the message payload.

For example payload is {"data": {"temperature": 23.9}}, you can via ["$payload", "data", "temperature"] to get 23.9.

In the case of array data type in Json, we introduced $0 and $<pos_integer>, $0 means to get all elements in the array, and $<pos_integer> means to get the <pos_integer>th element in the array.

A simple example, ["$payload", "$0", "temp"] will get [20, 21] from [{"temp": 20}, {"temp": 21}], and ["$payload", "$1", "temp"] will only get 20.

It is worth noting that when you use $0, we expect the number of data you get is same. Because we need to convert these arrays into multiple records and write it into OpenTSDB, and when you have three pieces of data in one field and two in another, we won't know how to combine the data for you.

Example

data/templates directory provides a sample template (emqx_backend_opentsdb_example.tmpl, please remove the "_example" suffix from the filename when using it formally) for the user's reference:

{
    "sample": {
        "measurement": "$topic",
        "tags": {
            "host": ["$payload", "data", "$0", "host"],
            "region": ["$payload", "data", "$0", "region"],
            "qos": "$qos",
            "clientid": "$clientid"
        },
        "value": ["$payload", "data", "$0", "temp"],
        "timestamp": "$timestamp"
    }
}

When an MQTT Message whose Topic is "sample" has the following Payload:

{
    "data": [
        {
            "temp": 1,
            "host": "serverA",
            "region": "hangzhou"
        },
        {
            "temp": 2,
            "host": "serverB",
            "region": "ningbo"
        }
    ]
}

Backend converts MQTT messages into the following data and writes it to OpenTSDB:

[
    {
        "measurement": "sample",
        "tags": {
            "clientid": "mqttjs_ebcc36079a",
            "host": "serverA",
            "qos": "0",
            "region": "hangzhou",
        },
        "value": "1",
        "timestamp": "1560743513626681000"
    },
    {
        "measurement": "sample",
        "tags": {
            "clientid": "mqttjs_ebcc36079a",
            "host": "serverB",
            "qos": "0",
            "region": "ningbo",
        },
        "value": "2",
        "timestamp": "1560743513626681000"
    }
]

Enable OpenTSDB Backend

./bin/emqx_ctl plugins load emqx_backend_opentsdb

Timescale Backend

Configure Timescale Server

Config file: etc/plugins/emqx_backend_timescale.conf:

## Timescale Server
backend.timescale.pool1.server = 127.0.0.1:5432
## Timescale Pool Size
backend.timescale.pool1.pool_size = 8
## Timescale Username
backend.timescale.pool1.username = postgres
## Timescale Password
backend.timescale.pool1.password = password
## Timescale Database
backend.timescale.pool1.database = tutorial
## Timescale SSL
backend.timescale.pool1.ssl = false

## SSL keyfile.
##
## Value: File
## backend.timescale.pool1.keyfile =

## SSL certfile.
##
## Value: File
## backend.timescale.pool1.certfile =

## SSL cacertfile.
##
## Value: File
## backend.timescale.pool1.cacertfile =

## Store Publish Message
backend.timescale.hook.message.publish.1 = {"topic": "#", "action": {"function": "on_message_publish"}, "pool": "pool1"}

Parameters in hook rule:

Option Description
topic Configure which topics need to execute hooks
action Configure specific action for hook, function is a built-in function provided as Backend for general functions
pool Pool Name, used to connect multiple Timescale servers

Example:

## Store PUBLISH message whose topic is "sensor/#"
backend.influxdb.hook.message.publish.1 = {"topic": "sensor/#", "action": {"function": "on_message_publish"}, "pool": "pool1"}

## Store PUBLISH message whose topic is "stat/#"
backend.influxdb.hook.message.publish.2 = {"topic": "stat/#", "action": {"function": "on_message_publish"}, "pool": "pool1"}

Description of Timescale Persistence Hooks

hook topic action Description
message.publish # on_message_publish Store published messages

Timescale Backend provides the template file named emqx_backend_timescale.tmpl, which is used to extract data from MQTT messages with different topics for writing to Timescale.

Template file use Json format:

  • key - MQTT Topic, Json String, support wildcard characters
  • value - Template, Json Object, used to convert MQTT Message into measurement,tag_key=tag_value,... field_key=field_value,... timestamp and write to InfluxDB。

You can define different templates for different topics or multiple templates for the same topic, likes:

{
    <Topic 1>: <Template 1>,
    <Topic 2>: <Template 2>
}

The template format is as follows:

{
    "name": <Name of template>,
    "sql": <SQL INSERT INTO>,
    "param_keys": <Param Keys>
}

name, sql and param_keys are required options.

name can be any string, just make sure there are no duplicates.

sql is SQL INSERT INTO statement for Timescale, like insert into sensor_data(time, location, temperature, humidity) values (NOW(), $1, $2, $3).

param_keys is a array, its first element corresponds to $1 appearing in sql and so on.

Any element in an array can be a fixed value, and the data type it supports depends on the table you define. More realistically, of course, you can access the data in the MQTT message through the placeholder we provide.

Currently, we support placeholders as follows:

Placeholder Description
$id MQTT Message UUID, assigned by EMQ X
$clientid Client ID used by the Client
$username Username used by the Client
$peerhost IP of Client
$qos QoS of MQTT Message
$topic Topic of MQTT Message
$payload Payload of MQTT Message, must be valid Json data
$<Number> It must be used with $paylaod to retrieve data from Json Array
$timestamp The timestamp EMQ X sets when preparing to forward messages, precision: Nanoseconds

$payload and $<Number>:

You can directly use $content to obtain the complete message payload, you can use ["$payload", <Key>, ...] to get the data inside the message payload.

For example payload is {"data": {"temperature": 23.9}}, you can via ["$payload", "data", "temperature"] to get 23.9.

In the case of array data type in Json, we introduced $0 and $<pos_integer>, $0 means to get all elements in the array, and $<pos_integer> means to get the <pos_integer>th element in the array.

A simple example, ["$payload", "$0", "temp"] will get [20, 21] from [{"temp": 20}, {"temp": 21}], and ["$payload", "$1", "temp"] will only get 20.

It is worth noting that when you use $0, we expect the number of data you get is same. Because we need to convert these arrays into multiple records and write it into Timescale, and when you have three pieces of data in one field and two in another, we won't know how to combine the data for you.

Example

data/templates directory provides a sample template (emqx_backend_timescale_example.tmpl, please remove the "_example" suffix from the filename when using it formally) for the user's reference:

{
    "sensor_data": {
        "name": "insert_sensor_data",
        "sql": "insert into sensor_data(time, location, temperature, humidity) values (NOW(), $1, $2, $3)",
        "param_keys": [
            ["$payload", "data", "$0", "location"],
            ["$payload", "data", "$0", "temperature"],
            ["$payload", "data", "$0", "humidity"]
        ]
    },
    "sensor_data2/#": {
        "name": "insert_sensor_data2",
        "sql": "insert into sensor_data(time, location, temperature, humidity) values (NOW(), $1, $2, $3)",
        "param_keys": [
            ["$payload", "location"],
            ["$payload", "temperature"],
            ["$payload", "humidity"]
        ]
    },
    "easy_data": {
        "name": "insert_easy_data",
        "sql": "insert into easy_data(time, data) values (NOW(), $1)",
        "param_keys": [
            "$payload"
        ]
    }
}

When an MQTT Message whose Topic is "sensor_data" has the following Payload:

{
    "data":[
        {
            "location":"bedroom",
            "temperature":21.3,
            "humidity":40.3
        },
        {
            "location":"bathroom",
            "temperature":22.3,
            "humidity":61.8
        },
        {
            "location":"kitchen",
            "temperature":29.5,
            "humidity":58.7
        }
    ]
}

["$payload", "data", "$0", "location"] will extract Payload from MQTT Message first.

If the format of Payload is json, backend continue to extract data from Payload.

And the value of data is an array, we use $0 to gets all elements in the array.

["$payload", "data", "$0", "location"] will help us get ["bedroom", "bathroom", "kitchen"] finally.

Accordingly if you replace $0 with $1, you get only ["bedroom"].

So in this scene, we will get the following SQL statement:

insert into sensor_data(time, location, temperature, humidity) values (NOW(), 'bedroom', 21.3, 40.3)
insert into sensor_data(time, location, temperature, humidity) values (NOW(), 'bathroom', 22.3, 61.8)
insert into sensor_data(time, location, temperature, humidity) values (NOW(), 'kitchen', 29.5, 58.7)

Eventually Timescale Backend executes these SQL statements to write data to Timescale.