产品概述
EMQ X 是目前全球市场广泛应用的百万级开源 MQTT 消息服务器,全球市场(西欧、北美、印度、中国)累积超 5000 家企业用户,产品环境下部署超 1 万节点,累计下载量超过 50 万,承载 MQTT 连接超 3000 万线。
EMQ X 企业版大幅改进系统设计架构,采用 Scalable RPC 机制,支持更稳定的节点集群与更高性能的消息路由。
EMQ X 企业版支持 MQTT 消息数据存储 Redis、MySQL、PostgreSQL、MongoDB、Cassandra、TimescaleDB、InfluxDB、DynamoDB、OpenTDSB 多种数据库。
EMQ X 企业版支持桥接转发 MQTT 消息到 Kafka、RabbitMQ、Pulsar、RocketMQ、MQTT Broker 企业消息中间件。
EMQ X 可以作为智能硬件、智能家居、物联网、车联网应用的百万级设备接入平台。
设计目标
EMQ X 是基于 Erlang/OTP 平台开发的开源物联网 MQTT 消息服务器。Erlang/OTP 是出色的软实时、低延时、分布式语言平台。MQTT 是轻量的、发布订阅模式的物联网消息协议。
EMQ X 设计目标是实现企业级高可靠,并支持承载海量物联网终端的 MQTT 连接,支持在海量物联网设备间低延时消息路由:
- 稳定承载大规模的 MQTT 客户端连接,单服务器节点支持 50 万到 100 万连接。
- 分布式节点集群,快速低延时的消息路由,单集群支持 1000 万规模的路由。
- 消息服务器内扩展,支持定制多种认证方式、高效存储消息到后端数据库。
- 完整物联网协议支持,MQTT、MQTT-SN、CoAP、LwM2M、WebSocket 或私有协议支持。
产品功能
- Scalable RPC 架构: 分离 Erlang 自身的集群通道与 EMQ X 节点间的数据通道。
- Redis 存储订阅关系、设备在线状态、MQTT 消息、保留消息,发布 SUB/UNSUB 事件。
- MySQL 存储订阅关系、设备在线状态、MQTT 消息、保留消息。
- PostgreSQL 存储订阅关系、设备在线状态、MQTT 消息、保留消息。
- MongoDB 存储订阅关系、设备在线状态、MQTT 消息、保留消息。
- Cassandra 存储订阅关系、设备在线状态、MQTT 消息、保留消息。
- DynamoDB 存储订阅关系、设备在线状态、MQTT 消息、保留消息。
- InfluxDB 存储 MQTT 时序消息。
- OpenTDSB 存储 MQTT 时序消息。
- TimescaleDB 存储 MQTT 时序消息。
- Kafka 桥接:EMQ X 内置 Bridge 直接转发 MQTT 消息、设备上下线事件到 Kafka。
- RabbitMQ 桥接:EMQ X 内置 Bridge 直接转发 MQTT 消息、设备上下线事件到 RabbitMQ。
- Pulsar 桥接:EMQ X 内置 Bridge 直接转发 MQTT 消息、设备上下线事件到 Pulsar。
- RocketMQ 桥接:EMQ X 内置 Bridge 直接转发 MQTT 消息、设备上下线事件到 RocketMQ。
- Rule Engine:将 EMQ X 的事件、消息 转换成指定格式,然后存入数据库表,或者发送到消息队列等。
- Schema Registry:将 EMQ X 的事件、消息 提供了数据编解码能力。
Scalable RPC 架构
EMQ X 企业版改进了分布节点间的通信机制,分离 Erlang 自身的集群通道与 EMQ X 的数据通道,大幅提高集群节点间的消息吞吐与集群稳定性:
Tip
虚线为 Erlang 的分布集群通道,实线为节点间消息数据通道。
Scalable RPC 配置:
## TCP server port for RPC.
rpc.tcp_server_port = 5369
## TCP port for outgoing RPC connections.
rpc.tcp_client_port = 5369
## RCP Client connect timeout.
rpc.connect_timeout = 5s
## TCP send timeout of RPC client and server.
rpc.send_timeout = 5s
## Authentication timeout
rpc.authentication_timeout = 5s
## Default receive timeout for call() functions
rpc.call_receive_timeout = 15s
## Socket idle keepalive.
rpc.socket_keepalive_idle = 900s
## TCP Keepalive probes interval.
rpc.socket_keepalive_interval = 75s
## Probes lost to close the connection
rpc.socket_keepalive_count = 9
Tip
集群节点间如存在防火墙,必须打开 5369 端口。
代理订阅
EMQ X 企业版支持服务端代理订阅功能,MQTT 客户端上线后无需发送 SUBSCRIBE 请求,EMQ X 代理从 Redis、MySQL 等数据库帮客户端加载订阅。
EMQ X 代理订阅功能在低功耗、低带宽网络环境下,可以节省客户端到 EMQ X 服务器的往返报文与流量。
消息数据存储
EMQ X 企业版支持存储订阅关系、MQTT 消息、设备状态到 Redis、MySQL、PostgreSQL、MongoDB、Cassandra、TimescaleDB、InfluxDB、DynamoDB、OpenTDSB 数据库:
数据存储相关配置,详见”数据存储”章节。
消息桥接转发
EMQ X 企业版支持直接转发 MQTT 消息到 RabbitMQ、Kafka、Pulsar、RocketMQ、MQTT Broker,可作为百万级的物联网接入服务器(IoT Hub):
规则引擎
EMQ X 规则引擎可以灵活地处理消息和事件。EMQ X 企业版规则引擎支持消息重新发布;桥接数据到 Kafka、Pulsar、RocketMQ、RabbitMQ、MQTT Broker;保存数据到 MySQL、PostgreSQL、Redis、MongoDB、DynamoDB、Cassandra、InfluxDB、OpenTSDB、TimescaleDB;发送数据到 WebServer:
规则引擎相关配置,详见”规则引擎”章节。
编解码
Schema Registry 目前可支持三种格式的编解码: Avro (opens new window) , Protobuf (opens new window) ,以及自定义编码。其中 Avro 和 Protobuf 是依赖 Schema 的数据格式,编码后的数据为二进制,解码后为 Map 格式 。解码后的数据可直接被规则引擎和其他插件使用。用户自定义的 (3rd-party) 编解码服务通过 HTTP 或 TCP 回调的方式,进行更加贴近业务需求的编解码。
编解码相关配置,详见”编解码”章节。