Cluster

Distributed Erlang

Erlang / OTP was originally a programming language platform designed by Ericsson for the development of telecommunication equipment systems. Telecommunication equipment (routers, access gateways) is typically a distributed system that connects the main control board and multiple business boards through the backplane.

Nodes and distributed Erlang

The distributed programs of the Erlang / OTP language platform are composed of distributed interconnected Erlang runtime systems. Each Erlang runtime system is called a node. Nodes are interconnected by TCP to form a network structure.

Erlang nodes are identified by a unique node name, which consists of two parts separated by @:

  1. <name>@<ip-address>

Communication between nodes is addressed by node name. For example, start four shell terminals locally, and then use the -name parameter to start four Erlang nodes respectively:

  1. erl -name node1@127.0.0.1 -setcookie my_nodes
  2. erl -name node2@127.0.0.1 -setcookie my_nodes
  3. erl -name node3@127.0.0.1 -setcookie my_nodes
  4. erl -name node4@127.0.0.1 -setcookie my_nodes

node (). can be used to view the name of this node, and nodes (). can be used to view other nodes that have established a connection with the current node. We now go to the console of ‘node1@127.0.0.1’ and check the current node name and connected nodes:

  1. (node1@127.0.0.1) 4> node().
  2. 'node1@127.0.0.1'
  3. (node1@127.0.0.1) 4> nodes().
  4. []

Then we let node1 initiate connections with other nodes:

  1. (node1@127.0.0.1) 1> net_kernel:connect_node('node2@127.0.0.1').
  2. true
  3. (node1@127.0.0.1) 2> net_kernel:connect_node('node3@127.0.0.1').
  4. true
  5. (node1@127.0.0.1) 3> net_kernel:connect_node('node4@127.0.0.1').
  6. true

Now we can check other nodes that are already connected to node1:

  1. (node1@127.0.0.1) 4> nodes().
  2. ['node2@127.0.0.1','node3@127.0.0.1','node4@127.0.0.1']

We can see that node2, node3, and node4 have established a distributed connection with node1, and these four nodes form a cluster. Note that whenever a new node joins the cluster, it will establish a TCP connection with all the nodes in the cluster. At this point, the four nodes have completed the mesh structure shown in the following figure:

image

Security

Cookies are used for interconnection authentication between Erlang nodes. A cookie is a string, and only two nodes with the same cookie can establish a connection. In the Previous section, We used the -setcookie my_nodes parameter to set the same cookie of my_nodes to four nodes.

See http://erlang.org/doc/reference_manual/distributed.htmlDistributed Cluster - 图2 (opens new window) for details.

Using TLS for backplane connections

It is possible to enable TLS encryption for the backplane connections. It comes at the cost of increased CPU load, though.

  1. Create a root CA using openssl tool:

    1. # Create self-signed root CA:
    2. openssl req -nodes -x509 -sha256 -days 1825 -newkey rsa:2048 -keyout rootCA.key -out rootCA.pem -subj "/O=LocalOrg/CN=LocalOrg-Root-CA"
  2. Generate CA-signed certificates for the nodes using the rootCA.pem created at step 1:

    1. # Create a private key:
    2. openssl genrsa -out domain.key 2048
    3. # Create openssl extfile:
    4. cat <<EOF > domain.ext
    5. authorityKeyIdentifier=keyid,issuer
    6. basicConstraints=CA:FALSE
    7. subjectAltName = @alt_names
    8. [alt_names]
    9. DNS.1 = backplane
    10. EOF
    11. # Create a CSR:
    12. openssl req -key domain.key -new -out domain.csr -subj "/O=LocalOrg"
    13. # Sign the CSR with the Root CA:
    14. openssl x509 -req -CA rootCA.pem -CAkey rootCA.key -in domain.csr -out domain.pem -days 365 -CAcreateserial -extfile domain.ext

    All the nodes in the cluster must use certificates signed by the same CA.

  3. Put the generated domain.pem, domain.key and rootCA.pem files to /var/lib/emqx/ssl on each node of the cluster. Make sure the emqx user can read these files, and permissions are set to 600.

  4. For Enterprise edition 4.4.0, add the following configuration to the end of ./releases/4.4.0/emqx.schema

    1. {mapping, "rpc.default_client_driver", "gen_rpc.default_client_driver",
    2. [{default, tcp}, {datatype, {enum, [tcp, ssl]}}]}.
  5. Add the following configuration. etc/rpc.conf for Enterprise edition, and emqx.conf for community edition:

    1. rpc.driver=ssl
    2. rpc.default_client_driver=ssl
    3. rpc.certfile=/var/lib/emqx/ssl/domain.pem
    4. rpc.cacertfile=/var/lib/emqx/ssl/rootCA.pem
    5. rpc.keyfile=/var/lib/emqx/ssl/domain.key
    6. rpc.enable_ssl=5369

EMQX Broker Cluster protocol settings

Each node in the Erlang cluster can be connected through TCPv4, TCPv6 or TLS, and the connection method can be configured inetc/emqx.conf:

Configuration nameTypeDefault valueDescription
cluster.proto_distenuminet_tcpDistributed protocol with optional values are as follows:
- inet_tcp: use TCP IPv4
- inet6_tcp: use TCP IPv6
- inet_tls: use TLS
node.ssl_dist_optfilefile pathetc/ssl_dist.confWhen cluster.proto_dist is selected as inet_tls, you need to configure the etc/ssl_dist.conf file, and specify the TLS certificate.

EMQX Broker Distributed cluster design

The basic function of EMQX Broker distribution is to forward and publish messages to subscribers on each node, as shown in the following figure:

image

To achieve this, EMQX Broker maintains several data structures related to it: subscription tables, routing tables, and topic trees.

Subscription Table: Topics-Subscribers

When an MQTT client subscribes to a topic, EMQX Broker maintains a Subscription Table for the Topic-> Subscriber mapping. The subscription table only exists on the EMQX Broker node where the subscriber is located, for example:

  1. node1:
  2. topic1 -> client1, client2
  3. topic2 -> client3
  4. node2:
  5. topic1 -> client4

Route Table: Topic-Node

All nodes in the same cluster will copy a topic-to-> node mapping table, for example:

  1. topic1 -> node1, node2
  2. topic2 -> node3
  3. topic3 -> node2, node4

Topic tree: topic matching with wildcards

In addition to the routing table, each node in the EMQX Broker cluster also maintains a backup of the Topic Trie.

The following topic-subscription relationship is an example:

ClientNodeSubscribed topic
client1node1t/+/x, t/+/y
client2node2t/#
client3node3t/+/x, t/a

When all subscriptions are completed, EMQX Broker maintains the following Topic Trie and Route Table:

image

Message Distribution Process

When an MQTT client publishes a message, the node where it is located retrieves the route table and forwards the message to the relevant node according to the message topic, and then the relevant node retrieves the local subscription table and sends the message to the relevant subscriber.

For example, when client1 publishes a message to the topic t/a. The routing and distribution of the message between nodes are as follows:

  1. client1 publishes a message with the topic t/a to the node1
  2. By querying the topic tree, node1 learns that t/a can match the two existing topics of t/a and t/#.
  3. By querying the route table, node1 learns that topic t/a has subscribers only on node3, and topict/#has subscribers only on node2. So node1 forwards the message to node2 and node3.
  4. After node2 receives the forwarded t/a message, it queries the local subscription table to obtain the subscribers who have subscribed to t/# on this node and distributes the message to them.
  5. After node3 receives the forwarded t/a message, it queries the local subscription table to obtain the subscribers who have subscribed to t/a on this node and distributes the message to them.
  6. Message forwarding and distribution are finished.

Data partition and sharing

EMQX Broker’s subscription table is partitioned in the cluster, while the topic tree and routing table are replicated.

Node discovery and automatic clustering

EMQX Broker supports Autocluster based on Ekka library. Ekka is a cluster management library developed for Erlang / OTP applications. It supports Service Discovery, Autocluster, Network Partition Autoheal, and Autoclean of Erlang node.

EMQX supports multiple node discovery strategies:

strategyDescription
manualCreating a cluster manually
staticAutocluster of static node lists
dnsAutocluster DNS A record
etcdAutocluster by etcd
k8sAutocluster of Kubernetes service

Note: mcast discovery strategy has been deprecated and will be removed in the future releases.

Creating a cluster manually

The default configuration is to manually create a cluster. Nodes should be added via the command of ./bin/emqx_ctl join \ <Node >:

  1. cluster.discovery = manual

Autocluster based on static node list

Configure a fixed node list to automatically discover and create clusters:

  1. cluster.discovery = static
  2. cluster.static.seeds = emqx1@127.0.0.1,emqx2@127.0.0.1

Autocluster based on mcast

Automatically discover and create clusters based on UDP multicast:

  1. cluster.discovery = mcast
  2. cluster.mcast.addr = 239.192.0.1
  3. cluster.mcast.ports = 4369,4370
  4. cluster.mcast.iface = 0.0.0.0
  5. cluster.mcast.ttl = 255
  6. cluster.mcast.loop = on

Autocluster based on DNS A records

Automatically discover and create clusters based on DNS A records:

  1. cluster.discovery = dns
  2. cluster.dns.name = localhost
  3. cluster.dns.app = ekka

Autocluster based on etcd

Automatically discover and create clusters based on etcdDistributed Cluster - 图5 (opens new window):

  1. cluster.discovery = etcd
  2. cluster.etcd.server = http://127.0.0.1:2379
  3. cluster.etcd.prefix = emqcl
  4. cluster.etcd.node_ttl = 1m

Autocluster based on kubernetes

Automatically discover and create clusters based on KubernetesDistributed Cluster - 图6 (opens new window):

  1. cluster.discovery = k8s
  2. cluster.k8s.apiserver = http://10.110.111.204:8080
  3. cluster.k8s.service_name = ekka
  4. cluster.k8s.address_type = ip
  5. cluster.k8s.app_name = ekka

Introduction to manual cluster management

Deploy EMQX Broker cluster on two servers of s1.emqx.io, s2.emqx.io:

Node nameServerIP address
emqx@s1.emqx.io or emqx@192.168.0.10s1.emqx.io192.168.0.10
emqx@s2.emqx.io or emqx@192.168.0.20s2.emqx.io192.168.0.20

Tip

The format of node name isName@Host, and Host must be an IP address or FQDN (server name. Domain name)

Configure emqx@s1.emqx.io node

emqx/etc/emqx.conf:

  1. node.name = emqx@s1.emqx.io
  2. # or
  3. node.name = emqx@192.168.0.10

Configure through environment variables:

  1. export EMQX_NODE_NAME=emqx@s1.emqx.io && ./bin/emqx start

Tip

After a node starts to join the cluster, the node name cannot be changed.

Configure emqx@s2.emqx.io node

emqx/etc/emqx.conf:

  1. node.name = emqx@s2.emqx.io
  2. # or
  3. node.name = emqx@192.168.0.20

Node joins the cluster

After starting two nodes, the join can be executed on s2.emqx.io:

  1. $ ./bin/emqx_ctl cluster join emqx@s1.emqx.io
  2. Join the cluster successfully.
  3. Cluster status: [{running_nodes,['emqx@s1.emqx.io','emqx@s2.emqx.io']}]

Or executed on s1.emqx.io:

  1. $ ./bin/emqx_ctl cluster join emqx@s2.emqx.io
  2. Join the cluster successfully.
  3. Cluster status: [{running_nodes,['emqx@s1.emqx.io','emqx@s2.emqx.io']}]

Query the cluster status on any node:

  1. $ ./bin/emqx_ctl cluster status
  2. Cluster status: [{running_nodes,['emqx@s1.emqx.io','emqx@s2.emqx.io']}]

Exit the cluster

There are two ways for a node to exit the cluster:

  1. leave: Leave this node exit the cluster
  2. force-leave: Remove other nodes from cluster

Let emqx@s2.emqx.io actively exit the cluster:

  1. $ ./bin/emqx_ctl cluster leave

Or remove the emqx@s2.emqx.io node from the cluster on s1.emqx.io:

  1. $ ./bin/emqx_ctl cluster force-leave emqx@s2.emqx.io

Start a cluster on single machine

For users who only have one server, the pseudo-distributed starting mode can be used. Please notice that if we want to start two or more nodes on one machine, we must adjust the listening port of the other node to avoid the port conflicts.

The basic process is to copy another emqx folder and name it emqx2. After that, we let all the listening ports of the original emqx to be added by an offset as the listening ports of the emqx2 node. For example, we can change the MQTT/TCP listening port from the default 1883 to 2883 as the MQTT/TCP listening port for emqx2. Please refer to Cluster ScriptDistributed Cluster - 图7 (opens new window) regarding to the above operations and also refer to Configuration Instructions and Configuration Items for details.

Network Partition Autoheal

EMQX supports Network Partition Autoheal, which can be configure in etc/emqx.conf:

  1. cluster.autoheal = on

Network Partition Autoheal Process:

  1. The node performs Network Partition confirmation 3 seconds after receiving the inconsistent_database event from Mnesia;
  2. After the node confirms that the Network Partition has occurred, it reports the message to the Leader node (the earliest start node in the cluster);
  3. After the Leader node delays for a period of time, it create a SplitView when all nodes are online;
  4. The Leader node selects the self-healing Coordinator node in the majority partition;
  5. The Coordinator node restarts the minority partition node to restore the cluster.

Autoclean of Cluster nodes

EMQX supports Autoclean frol cluster , which can be configured in etc/emqx.conf :

  1. cluster.autoclean = 5m

Firewall settings

The Node Discovery Ports

If the environment variable WITH_EPMD=1 is set in advance, the epmd (listening port 4369) will be enabled for node discovery when emqx is started, which is called epmd mode.

If the environment variable WITH_EPMD is not set, epmd is not enabled when emqx is started, and emqx ekka is used for node discovery, which is also the default method of node discovery since version 4.0. This is called ekka mode.

epmd mode:

If there is a firewall between cluster nodes, the firewall needs to open TCP port 4369 for each node, to allow peers query each other’s listening port. The firewall should also allow nodes connecting to port in configurable range from node.dist_listen_min to node.dist_listen_max (inclusive, default is 6369 for both)

ekka mode(Default mode since version 4.0):

In ekka mode, the port mapping is conventional, but not dynamic as in epmd mode. The configurations node.dist_listen_min and node.dist_listen_max take no effect in this case.

If there is a firewall between the cluster nodes, the conventional listening port should be allowed for nodes to connect each other. See below for port mapping rule in ekka mode.

Erlang distribution port mapping rule in ekka mode: ListeningPort = BasePort + Offset, where BasePort is 4370 (which is not made configurable), and Offset is the numeric suffix of the node’s name. If the node name does not have a numeric suffix, Offsset is 0.

For example, having node.name = emqx@192.168.0.12 in emqx.conf should make the node listen on port 4370, and port 4371 for emqx1 (or emqx-1), and so on.

The Cluster RPC Port

Each emqx node also listens on a (conventional) port for the RPC channels, which should also be allowed by the firewall. The port mapping rule is similar to the node discovery ports in ekka mode, but with the BasePort = 5370. That is, having node.name = emqx@192.168.0.12 in emqx.conf should make the node listen on port 5370, and port 5371 for emqx1 (or emqx-1), and so on.