User Guide¶
Suppose an EMQ X Cluster with two Linux nodes deployed on a cloud VPC network or a private network:
Node name | IP |
emqx1@192.168.0.10 | 192.168.0.10 |
emqx@192.168.0.20 | 192.168.0.20 |
System Parameters¶
Deployed under Linux, EMQ X allows 100k concurrent connections by default. To achieve this, the system Kernel, Networking, the Erlang VM and EMQ X itself must be tuned.
System-Wide File Handles¶
Maximum file handles:
# 2 millions system-wide
sysctl -w fs.file-max=262144
sysctl -w fs.nr_open=262144
echo 262144 > /proc/sys/fs/nr_open
Maximum file handles for current session:
ulimit -n 262144
/etc/sysctl.conf¶
Add 'fs.file-max' to '/etc/sysctl.conf' and make the changes permanent:
fs.file-max = 262144
/etc/security/limits.conf¶
Persist the maximum number of opened file handles for users in /etc/security/limits.conf:
emqx soft nofile 262144
emqx hard nofile 262144
Note: Under Ubuntu, '/etc/systemd/system.conf' is to be modified:
DefaultLimitNOFILE=262144
EMQ X Node Name¶
Set the node name and cookies(required by communication between nodes)
'/etc/emqx/emqx.conf' on emqx1:
node.name = emqx1@192.168.0.10
node.cookie = secret_dist_cookie
'/etc/emqx/emqx.conf' on emqx2:
node.name = emqx2@192.168.0.20
node.cookie = secret_dist_cookie
Start EMQ X Broker¶
The EMQ X Broker can be downloaded at: https://www.emqx.io/downloads#enterprise
After downloading the package, it can be installed or unzipped directly to start running. Taking the zip package for MacOS platform as an example:
unzip emqx-ee-macosx-v3.2.0.zip && cd emqx
# Start emqx
./bin/emqx start
# Check running status
./bin/emqx_ctl status
Clustering the EMQ X Nodes¶
Start the two nodes, then on the emqx1@192.168.0.10 run:
$ ./bin/emqx_ctl cluster join emqx2@192.168.0.20
Join the cluster successfully.
Cluster status: [{running_nodes,['emqx1@192.168.0.10','emqx@192.168.0.20']}]
or, on the emqx1@192.168.0.20 run:
$ ./bin/emqx_ctl cluster join emqx1@192.168.0.10
Join the cluster successfully.
Cluster status: [{running_nodes,['emqx1@192.168.0.10','emqx@192.168.0.20']}]
Check the cluster status on any node:
$ ./bin/emqx_ctl cluster status
Cluster status: [{running_nodes,['emqx1@192.168.0.10','emqx@192.168.0.20']}]
The default TCP ports used by the EMQ X message server include:
1883 | MQTT protocol port |
8883 | MQTT/SSL port |
8083 | MQTT/WebSocket port |
8080 | HTTP API port |
18083 | Dashboard Management Console Port |
MQTT publish and subscription¶
EMQ X Broker is a lightweight publish-subscribe message broker designed for the mobile Internet and the IoT, it currently supports MQTT v3.1.1 <http://docs.oasis-open.org/mqtt/mqtt/v3.1.1/mqtt-v3.1.1.html>`_ and v5.0:
After EMQ X is started, devices and clients can connect to the broker through the MQTT protocol and exchange messages via Publish/Subscribe.
MQTT client library: https://github.com/mqtt/mqtt.github.io/wiki/libraries
E.g., using mosquitto_sub/pub CLI to subscribe to topics and publish messages.
mosquitto_sub -h 127.0.0.1 -p 1883 -t topic -q 2
mosquitto_pub -h 127.0.0.1 -p 1883 -t topic -q 1 -m "Hello, MQTT!"
Authentication and Access Control¶
EMQ X Broker provides Connection Authentication and Access Control using a series of authentication plugins, whose name conforms to the pattern of emqx_auth_<name>.
In EMQ X, these two functions are:
- Connection authentication: EMQ X verifies whether the client on each connection has access to the system. If not, it disconnects the connection
- Access Control: EMQ X verifies the permissions of each Publish/Subscribe action of a client, and allows/denies the corresponding action
Authentication¶
EMQ X Message Broker's Authentication is provided by a series of authentication plugins. It supports authentication by username, password, ClientID or anonymous.
By default, anonymous authentication is enabled. Multiple authentication modules can be started by loading the corresponding authentication plug-ins and forming an authentication chain:
** Start anonymous authentication**
Modify the etc/emqx.conf
file to enable anonymous authentication:
## Allow anonymous access
## Value: true | false
allow_anonymous = true
Access Control List¶
EMQ X Broker implements MQTT client access control through an ACL (Access Control List).
ACL access control rule definition:
Allow|Deny Identity Subscribe|Publish Topics
When an MQTT client initiates a subscribe/publish request, the access control module of EMQ X Broker will match the ACL rules one by one until the match is successful:
Default access control settings
EMQ X Message Broker's default access control can be set in file etc/emqx.conf
:
## Set whether to allow access when all ACL rules cannot match
## Value: allow | deny
acl_nomatch = allow
## Set the default file for storing ACL rules
## Value: File Name
acl_file = etc/acl.conf
The ACL rules are defined in file etc/acl.conf
, which is loaded into memory when EMQ X starts:
%% Aallows 'dashboard' users to subscribe to '$SYS/#'
{allow, {user, "dashboard"}, subscribe, ["$SYS/#"]}.
%% Allows local user to publish and subscribe to all topics
{allow, {ipaddr, "127.0.0.1"}, pubsub, ["$SYS/#", "#"]}.
%% Deny all the users to subscribe to '$SYS/#' and '#' topics except local users
{deny, all, subscribe, ["$SYS/#", {eq, "#"}]}.
%% Allows any situation other than the above rules
{allow, all}.
The authentication plugins provided by EMQ X include:
plugins | description |
---|---|
emqx_auth_clientid | ClientId authentication plugin |
emqx_auth_username | username and password authentication plugin |
emqx_auth_jwt | JWT authentication plugin |
emqx_auth_ldap | LDAP authentication plugin |
emqx_auth_http | HTTP authentication plugin |
emqx_auth_mysql | MySQ Lauthentication plugin |
emqx_auth_pgsql | Postgre authentication plugin |
emqx_auth_redis | Redis authentication plugin |
emqx_auth_mongo | MongoDB authentication plugin |
For the configuration and usage of each authentication plug-in, please refer to authentication section of the Plugins.
注解
Multiple auth plug-ins can be started at the same time. The plug-in that starts first checks first.
In addition, EMQ X also supports the use of PSK (Pre-shared Key) for authentication. However, the authentication chain mentioned above is not used in this case. The verification is done during the SSL handshake. For details please refer to Pre-shared Key and emqx_psk_file
Bridge¶
Bridging two EMQ X Nodes¶
The concept of bridging is that EMQ X forwards messages of some of its topics to another MQTT Broker in some way.
Difference between Bridge and cluster is that bridge does not replicate topic trees and routing tables, a bridge only forwards MQTT messages based on bridging rules.
Currently the bridging methods supported by EMQ X are as follows:
- RPC bridge: RPC Bridge only supports message forwarding and does not support subscribing to the topic of remote nodes to synchronize data.
- MQTT Bridge: MQTT Bridge supports both forwarding and data synchronization through subscription topic
The concept is shown below:
In addition, the EMQ X supports multi-node bridge mode interconnection:
In EMQ X, bridge is configured by modifying etc/plugins/emqx_bridge_mqtt.conf
. EMQ X distinguishes between different bridges based on different names. E.g:
## Bridge address: node name for local bridge, host:port for remote.
bridge.mqtt.aws.address = 127.0.0.1:1883
This configuration declares a bridge named aws
and specifies that it is bridged to the MQTT server of 127.0.0.1:1883 by MQTT mode.
In case of creating multiple bridges, it is convenient to replicate all configuration items of the first bridge, and modify the bridge name and other configuration items if necessary (such as bridge.mqtt.$name.address, where $name refers to the name of bridge)
The next two sections describe how to create a bridge in RPC and MQTT mode respectively and create a forwarding rule that forwards the messages from sensors. Assuming that two EMQ X nodes are running on two hosts:
Name | Node | MQTT port |
emqx1 | emqx1@192.168.1.1 | 1883 |
emqx2 | emqx2@192.168.1.2 | 1883 |
EMQ X Node RPC Bridge Configuration¶
The following is the basic configuration of RPC bridging. The simplest RPC bridging only needs to configure the following three items:
## Bridge Address: Use node name (nodename@host) for rpc bridging, and host:port for mqtt connection
bridge.mqtt.aws.address = emqx2@192.168.1.2
## Forwarding topics of the message
bridge.mqtt.aws.forwards = sensor1/#,sensor2/#
## bridged mountpoint
bridge.mqtt.aws.mountpoint = bridge/emqx2/${node}/
If the message received by the local emqx1 node matches the topic sersor1/#
or sensor2/#
, these messages will be forwarded to the sensor1/#
or sensor2/#
topic of the remote emqx2 node.
forwards
is used to specify topics. Messages of the in forwards
specified topics on local node are forwarded to the remote node.
mountpoint
is used to add a topic prefix when forwarding a message. To use mountpoint
, the forwards
directive must be set. In the above example, a message with the topic sensor1/hello received by the local node will be forwarded to the remote node with the topic bridge/emqx2/emqx1@192.168.1.1/sensor1/hello
.
Limitations of RPC bridging:
- The RPC bridge of emqx can only forward local messages to the remote bridge node, and cannot synchronize the messages of the remote bridge node to the local node;
- RPC bridge can only bridge two EMQ X together and cannot bridge EMQ X to other mqtt brokers.
EMQ X Node MQTT Bridge Configuration¶
EMQ X 3.0 officially introduced MQTT bridge, so that EMQ X can bridge any MQTT broker. Because of the characteristics of the MQTT protocol, EMQ X can subscribe to the remote mqtt broker's topic through MQTT bridge, and then synchronize the remote MQTT broker's message to the local.
EMQ X MQTT bridging principle: Create an MQTT client on the EMQ X broker, and connect this MQTT client to the remote MQTT broker. Therefore, in the MQTT bridge configuration, following fields may be set for the EMQ X to connect to the remote broker as an mqtt client:
## Bridge Address: Use node name for rpc bridging, use host:port for mqtt connection
bridge.mqtt.aws.address = 192.168.1.2:1883
## Bridged Protocol Version
## Enumeration value: mqttv3 | mqttv4 | mqttv5
bridge.mqtt.aws.proto_ver = mqttv4
## mqtt client's client_id
bridge.mqtt.aws.client_id = bridge_emq
## mqtt client's clean_start field
## Note: Some MQTT Brokers need to set the clean_start value as `true`
bridge.mqtt.aws.clean_start = true
## mqtt client's username field
bridge.mqtt.aws.username = user
## mqtt client's password field
bridge.mqtt.aws.password = passwd
## Whether the mqtt client uses ssl to connect to a remote serve or not
bridge.mqtt.aws.ssl = off
## CA Certificate of Client SSL Connection (PEM format)
bridge.mqtt.aws.cacertfile = etc/certs/cacert.pem
## SSL certificate of Client SSL connection
bridge.mqtt.aws.certfile = etc/certs/client-cert.pem
## Key file of Client SSL connection
bridge.mqtt.aws.keyfile = etc/certs/client-key.pem
## SSL encryption
bridge.mqtt.aws.ciphers = ECDHE-ECDSA-AES256-GCM-SHA384,ECDHE-RSA-AES256-GCM-SHA384
## TTLS PSK password
## Note 'listener.ssl.external.ciphers' and 'listener.ssl.external.psk_ciphers' cannot be configured at the same time
##
## See 'https://tools.ietf.org/html/rfc4279#section-2'.
## bridge.mqtt.aws.psk_ciphers = PSK-AES128-CBC-SHA,PSK-AES256-CBC-SHA,PSK-3DES-EDE-CBC-SHA,PSK-RC4-SHA
## Client's heartbeat interval
bridge.mqtt.aws.keepalive = 60s
## Supported TLS version
bridge.mqtt.aws.tls_versions = tlsv1.2,tlsv1.1,tlsv1
## Forwarding topics of the message
bridge.mqtt.aws.forwards = sensor1/#,sensor2/#
## Bridged mountpoint
bridge.mqtt.aws.mountpoint = bridge/emqx2/${node}/
## Subscription topic for bridging
bridge.mqtt.aws.subscription.1.topic = cmd/topic1
## Subscription qos for bridging
bridge.mqtt.aws.subscription.1.qos = 1
## Subscription topic for bridging
bridge.mqtt.aws.subscription.2.topic = cmd/topic2
## Subscription qos for bridging
bridge.mqtt.aws.subscription.2.qos = 1
## Bridging reconnection interval
## Default: 30s
bridge.mqtt.aws.reconnect_interval = 30s
## QoS1 message retransmission interval
bridge.mqtt.aws.retry_interval = 20s
## Inflight Size.
bridge.mqtt.aws.max_inflight_batches = 32
EMQ X Bridge Cache Configuration¶
The bridge of EMQ X has a message caching mechanism. The caching mechanism is applicable to both RPC bridging and MQTT bridging. When the bridge is disconnected (such as when the network connection is unstable), the messages with a topic specified in forwards
can be cached to the local message queue. Until the bridge is restored, these messages are re-forwarded to the remote node. The configuration of the cache queue is as follows:
## emqx_bridge internal number of messages used for batch
bridge.mqtt.aws.queue.batch_count_limit = 32
## emqx_bridge internal number of message bytes used for batch
bridge.mqtt.aws.queue.batch_bytes_limit = 1000MB
## The path for placing replayq queue. If the item is not specified in the configuration, then replayq will run in `mem-only` mode and messages will not be cached on disk.
bridge.mqtt.aws.queue.replayq_dir = data/emqx_emqx2_bridge/
## Replayq data segment size
bridge.mqtt.aws.queue.replayq_seg_bytes = 10MB
bridge.emqx2.queue.replayq_dir
is a configuration parameter for specifying the path of the bridge storage queue.
bridge.mqtt.aws.queue.replayq_seg_bytes
is used to specify the size of the largest single file of the message queue that is cached on disk. If the message queue size exceeds the specified value, a new file is created to store the message queue.
CLI for EMQ X Bridge¶
CLI for EMQ X Bridge:
$ cd emqx1/ && ./bin/emqx_ctl bridges
bridges list # List bridges
bridges start <Name> # Start a bridge
bridges stop <Name> # Stop a bridge
bridges forwards <Name> # Show a bridge forward topic
bridges add-forward <Name> <Topic> # Add bridge forward topic
bridges del-forward <Name> <Topic> # Delete bridge forward topic
bridges subscriptions <Name> # Show a bridge subscriptions topic
bridges add-subscription <Name> <Topic> <Qos> # Add bridge subscriptions topic
List all bridge states
$ ./bin/emqx_ctl bridges list
name: emqx status: Stopped
Start the specified bridge
$ ./bin/emqx_ctl bridges start emqx
Start bridge successfully.
Stop the specified bridge
$ ./bin/emqx_ctl bridges stop emqx
Stop bridge successfully.
List the forwarding topics for the specified bridge
$ ./bin/emqx_ctl bridges forwards emqx
topic: topic1/#
topic: topic2/#
Add a forwarding topic for the specified bridge
$ ./bin/emqx_ctl bridges add-forwards emqx topic3/#
Add-forward topic successfully.
Delete the forwarding topic for the specified bridge
$ ./bin/emqx_ctl bridges del-forwards emqx topic3/#
Del-forward topic successfully.
List subscriptions for the specified bridge
$ ./bin/emqx_ctl bridges subscriptions emqx
topic: cmd/topic1, qos: 1
topic: cmd/topic2, qos: 1
Add a subscription topic for the specified bridge
$ ./bin/emqx_ctl bridges add-subscription emqx cmd/topic3 1
Add-subscription topic successfully.
Delete the subscription topic for the specified bridge
$ ./bin/emqx_ctl bridges del-subscription emqx cmd/topic3
Del-subscription topic successfully.
Note: In case of creating multiple bridges, it is convenient to replicate all configuration items of the first bridge, and modify the bridge name and other configuration items if necessary.
HTTP Publish API¶
The EMQ X message server provides an HTTP publish interface through which an application server or web server can publish MQTT messages:
HTTP POST http://host:8080/api/v3/mqtt/publish
Web servers such as PHP/Java/Python/NodeJS or Ruby on Rails can publish MQTT messages via HTTP POST requests:
curl -v --basic -u user:passwd -H "Content-Type: application/json" -d \
'{"qos":1, "retain": false, "topic":"world", "payload":"test" , "client_id": "C_1492145414740"}' \-k http://localhost:8080/api/v3/mqtt/publish
HTTP interface parameters:
parameter | description |
---|---|
client_id | MQTT client ID |
qos | QoS: 0 | 1 | 2 |
retain | Retain: true | false |
topic | Topic |
payload | message payload |
注解
HTTP publishing interface uses authentication of Basic . The user and password in the above example are from the AppId and password in the Applications of Dashboard.
MQTT WebSocket Connection¶
EMQ X also supports WebSocket connections, web browsers or applications can connect directly to the broker via WebSocket:
WebSocket URI: | ws(s)://host:8083/mqtt |
Sec-WebSocket-Protocol: | 'mqttv3.1' or 'mqttv3.1.1' |
The Dashboard plugin provides a test tool for an MQTT WebSocket connection:
http://127.0.0.1:18083/#/websocket
$SYS - System topic¶
The EMQ X Broker periodically publishes its running status, message statistics, client online and offline events to the system topic starting with $SYS/
.
The $SYS
topic path begins with $SYS/brokers/{node}/
. {node}
is the name of the node where the event/message is generated, for example:
$SYS/brokers/emqx@127.0.0.1/version
$SYS/brokers/emqx@127.0.0.1/uptime
注解
By default, only the MQTT client on localhost is allowed to subscribe to the $SYS topic, this can be changed by modifying the access control rules in file etc/acl.config
.
$SYS system message publish interval is configured in etc/emqx.conf
:
## System interval of publishing $SYS messages.
##
## Value: Duration
## Default: 1m, 1 minute
broker.sys_interval = 1m
Cluster status information¶
Topic | Description |
---|---|
$SYS/brokers | cluster node list |
$SYS/brokers/${node}/version | EMQ X broker version |
$SYS/brokers/${node}/uptime | EMQ X broker startup time |
$SYS/brokers/${node}/datetime | EMQ X broker time |
$SYS/brokers/${node}/sysdescr | EMQ X broker Description |
Client Online and Offline Events¶
$SYS topic prefix: $SYS/brokers/${node}/clients/
Topic | Description |
---|---|
${clientid}/connected | Online event. This message is published when a client goes online. |
${clientid}/disconnected | Offline event. This message is published when a client is offline |
The Payload of the 'connected' event message can be parsed into JSON format:
{
"clientid":"id1",
"username":"u",
"ipaddress":"127.0.0.1",
"connack":0,
"ts":1554047291,
"proto_ver":3,
"proto_name":"MQIsdp",
"clean_start":true,
"keepalive":60
}
The Payload of the 'disconnected' event message can be parsed into JSON format:
{
"clientid":"id1",
"username":"u",
"reason":"normal",
"ts":1554047291
}
Statistics¶
System topic prefix: $SYS/brokers/${node}/stats/
Client statistics¶
Topic | Description |
connections/count | Total number of current clients |
connections/max | Maximum number of clients |
Session statistics¶
Topic | Description |
sessions/count | Total number of current sessions |
sessions/max | maximum number of sessions |
sessions/persistent/count | Total number of persistent sessions |
sessions/persistent/max | maximum number of persistent sessions |
Subscription statistics¶
Topic | Description |
suboptions/count | number of current subscription options |
suboptions/max | total number of maximum subscription options |
subscribers/max | total number of maximum subscribers |
subscribers/count | number of current subscribers |
subscriptions/max | maximum number of subscriptions |
subscriptions/count | total number of current subscription |
subscriptions/shared/count | total number of current shared subscriptions |
subscriptions/shared/max | maximum number of shared subscriptions |
Topic statistics¶
Topic | Description |
topics/count | total number of current topics |
topics/max | maximum number of topics |
Routes statistics¶
Topic | Description |
routes/count | total number of current Routes |
routes/max | maximum number of Routes |
注解
The topics/count and topics/max are numerically equal to routes/count and routes/max.
Throughput (bytes/packets/message) statistics¶
System Topic Prefix: $SYS/brokers/${node}/metrics/
sent and received bytes statistics¶
Topic | Description |
bytes/received | Accumulated received bytes |
bytes/sent | Accumulated sent bytes |
sent and received MQTT packets statistics¶
Topic | Description |
packets/received | Accumulative received MQTT packets |
packets/sent | Accumulative sent MQTT packets |
packets/connect | Accumulative received packets of MQTT CONNECT |
packets/connack | Accumulative sent packets of MQTT CONNACK |
packets/publish/received | Accumulative received packets of MQTT PUBLISH |
packets/publish/sent | Accumulative sent packets of MQTT PUBLISH |
packets/puback/received | Accumulative received packets of MQTT PUBACK |
packets/puback/sent | Accumulative sent packets of MQTT PUBACK |
packets/puback/missed | Accumulative missed packets of MQTT PUBACK |
packets/pubrec/received | Accumulative received packets of MQTT PUBREC |
packets/pubrec/sent | Accumulative sent packets of MQTT PUBREC |
packets/pubrec/missed | Accumulative missed packets of MQTT PUBREC |
packets/pubrel/received | Accumulative received packets of MQTT PUBREL |
packets/pubrel/sent | Accumulative sent packets of MQTT PUBREL |
packets/pubrel/missed | Accumulative missed packets of MQTT PUBREL |
packets/pubcomp/received | Accumulative received packets of MQTT PUBCOMP |
packets/pubcomp/sent | Accumulative sent packets of MQTT PUBCOMP |
packets/pubcomp/missed | Accumulative missed packets of MQTT PUBCOMP |
packets/subscribe | Accumulative received packets of MQTT SUBSCRIBE |
packets/suback | Accumulative sent packets of MQTT SUBACK |
packets/unsubscribe | Accumulative received packets of MQTT UNSUBSCRIBE |
packets/unsuback | Accumulative sent packets of MQTT UNSUBACK |
packets/pingreq | Accumulative received packets of MQTT PINGREQ |
packets/pingresp | Accumulative sent packets of MQTT PINGRESP |
packets/disconnect/received | Accumulative received packets of MQTT DISCONNECT |
packets/disconnect/sent | Accumulative sent packets of MQTT MQTT DISCONNECT |
packets/auth | Accumulative received packets of MQTT Auth |
MQTT sent and received messages statistics¶
Topic | Description |
messages/received | Accumulative received messages |
messages/sent | Accumulative sent messages |
messages/expired | Accumulative expired messages |
messages/retained | Accumulative retained messages |
messages/dropped | Total number of dropped messages |
messages/forward | Total number of messages forwarded by the node |
messages/qos0/received | Accumulative received messages of QoS0 |
messages/qos0/sent | Accumulative sent messages of QoS0 |
messages/qos1/received | Accumulative received messages QoS1 |
messages/qos1/sent | Accumulative sent messages QoS1 |
messages/qos2/received | Accumulative received messages of QoS2 |
messages/qos2/sent | Accumulative sent messages of QoS2 |
messages/qos2/expired | Total number of expired messages of QoS2 |
messages/qos2/dropped | Total number of dropped messages of QoS2 |
Alarms - system alarms¶
System Topic Prefix: $SYS/brokers/${node}/alarms/
Topic | Description |
alert | newly generated alarm |
clear | cleared alarm |
Sysmon - system monitoring¶
System Topic Prefix: $SYS/brokers/${node}/sysmon/
Topic | Description |
long_gc | GC Overtime alarm |
long_schedule | Alarm for Excessive Scheduling Time |
large_heap | ALarm for Heap Memory Occupancy |
busy_port | Alarm for Port busy |
busy_dist_port | Alarm for Dist Port busy |
Trace¶
EMQ X message server supports tracing all messages from a client or published to a topic.
Trace messages from the client:
$ ./bin/emqx_ctl log primary-level debug
$ ./bin/emqx_ctl trace start client "clientid" "trace_clientid.log" debug
Trace messages published to a topic:
$ ./bin/emqx_ctl log primary-level debug
$ ./bin/emqx_ctl trace start topic "t/#" "trace_topic.log" debug
Query trace:
$ ./bin/emqx_ctl trace list
Stop trace:
$ ./bin/emqx_ctl trace stop client "clientid"
$ ./bin/emqx_ctl trace stop topic "topic"