RocketMQ Connect实战4

SFTP Server(文件数据) -> RocketMQ Connect -> SFTP Server(文件)

准备

启动RocketMQ

  1. Linux/Unix/Mac
  2. 64bit JDK 1.8+;
  3. Maven 3.2.x或以上版本;
  4. 启动 RocketMQ。使用RocketMQ 4.xRocketMQ 5.x版本均可;
  5. 工具测试 RocketMQ 消息收发是否正常。详见RocketMQ 4.xRocketMQ 5.x文档。

这里利用环境变量NAMESRV_ADDR来告诉工具客户端RocketMQ的NameServer地址为localhost:9876

  1. #$ cd distribution/target/rocketmq-4.9.7/rocketmq-4.9.7
  2. $ cd distribution/target/rocketmq-5.1.4/rocketmq-5.1.4
  3. $ export NAMESRV_ADDR=localhost:9876
  4. $ sh bin/tools.sh org.apache.rocketmq.example.quickstart.Producer
  5. SendResult [sendStatus=SEND_OK, msgId= ...
  6. $ sh bin/tools.sh org.apache.rocketmq.example.quickstart.Consumer
  7. ConsumeMessageThread_%d Receive New Messages: [MessageExt...

说明:RocketMQ具备自动创建Topic和Group的功能,在发送消息或订阅消息时,如果相应的Topic或Group不存在,RocketMQ会自动创建它们。因此不需要提前创建Topic和Group。

构建 Connector Runtime

  1. git clone https://github.com/apache/rocketmq-connect.git
  2. cd rocketmq-connect
  3. export RMQ_CONNECT_HOME=`pwd`
  4. mvn -Prelease-connect -Dmaven.test.skip=true clean install -U

构建 SFTP Connector Plugin

  1. cd $RMQ_CONNECT_HOME/connectors/rocketmq-connect-sftp/
  2. mvn clean package -Dmaven.test.skip=true

将 SFTP RocketMQ Connector 编译好的包放入Runtime加载的Plugin目录

  1. mkdir -p /Users/YourUsername/rocketmqconnect/connector-plugins
  2. cp target/rocketmq-connect-sftp-0.0.1-SNAPSHOT-jar-with-dependencies.jar /Users/YourUsername/rocketmqconnect/connector-plugins

单机模式运行 Connector Worker

connect-standalone.conf中配置了RocketMQ连接地址等信息,需要根据使用情况进行修改

  1. cd $RMQ_CONNECT_HOME/distribution/target/rocketmq-connect-0.0.1-SNAPSHOT/rocketmq-connect-0.0.1-SNAPSHOT
  2. vim conf/connect-standalone.conf

示例配置信息如下

  1. workerId=standalone-worker
  2. storePathRootDir=/Users/YourUsername/rocketmqconnect/storeRoot
  3. ## Http port for user to access REST API
  4. httpPort=8082
  5. # Rocketmq namesrvAddr
  6. namesrvAddr=localhost:9876
  7. # RocketMQ acl
  8. aclEnable=false
  9. #accessKey=rocketmq
  10. #secretKey=12345678
  11. clusterName="DefaultCluster"
  12. # 插件地址,用于Worker加载Source/Sink Connector插件
  13. pluginPaths=/Users/YourUsername/rocketmqconnect/connector-plugins

单机模式(standalone)下,RocketMQ Connect 会把同步位点信息持久化到本地文件目录 storePathRootDir

storePathRootDir=/Users/YourUsername/rocketmqconnect/storeRoot

如果想重置同步位点,则需要删除持久化的位点信息文件

  1. rm -rf /Users/YourUsername/rocketmqconnect/storeRoot/*

采用单机模式启动Connector Worker

  1. sh bin/connect-standalone.sh -c conf/connect-standalone.conf &

搭建 SFTP 服务器

SFTP(SSH File Transfer Protocol)是一个文件传输协议,用于在计算机之间进行安全的文件传输。SFTP建立在SSH连接之上,它是通过SSH(Secure Shell)协议进行加密和身份验证的。

这里为了方便演示,使用 MAC OS 自带的 SFTP 服务(只需开启“远程登录”即可访问),详细参见允许远程电脑访问你的 Mac文档。

创建源端测试文件

创建源端测试文件 source.txt ,并写入测试数据

  1. mkdir -p /Users/YourUsername/rocketmqconnect/sftp-test/
  2. cd /Users/YourUsername/rocketmqconnect/sftp-test/
  3. touch source.txt
  4. echo '张三|100000202211290001|20221129001|30000.00|2022-11-28|03:00:00|7.00
  5. 李四|100000202211290002|20221129002|40000.00|2022-11-28|04:00:00|9.00
  6. 赵五|100000202211290003|20221129003|50000.00|2022-11-28|05:00:00|12.00' >> source.txt

登录 SFTP 服务,验证是否能正常访问。输入下面命令,输入密码后即可进入SFTP服务器

  1. # sftp -P port YourUsername@hostname
  2. sftp -P 22 YourUsername@127.0.0.1

说明:由于是本机MAC OS提供的SFTP服务,所以地址是 127.0.0.1, 端口是默认的22。

  1. sftp> cd /Users/YourUsername/rocketmqconnect/sftp-test/
  2. sftp> ls source.txt
  3. sftp> bye

启动Connector

启动 SFTP source connector

运行以下命令启动 SFTP source connector,connector将会连接到SFTP服务读取source.txt文件, 每读取文件中的一行内容,就会解析并封装成通用的ConnectRecord对象,发送到RocketMQ Topic当中, 供Sink Connector进行消费。

  1. curl -X POST --location "http://localhost:8082/connectors/SftpSourceConnector" --http1.1 \
  2. -H "Host: localhost:8082" \
  3. -H "Content-Type: application/json" \
  4. -d '{
  5. "connector.class": "org.apache.rocketmq.connect.http.sink.SftpSourceConnector",
  6. "host": "127.0.0.1",
  7. "port": 22,
  8. "username": "YourUsername",
  9. "password": "yourPassword",
  10. "filePath": "/Users/YourUsername/rocketmqconnect/sftp-test/source.txt",
  11. "connect.topicname": "sftpTopic",
  12. "fieldSeparator": "|",
  13. "fieldSchema": "username|idCardNo|orderNo|orderAmount|trxDate|trxTime|profit"
  14. }'

curl请求返回status:200则表示创建成功,返回样例:

{“status”:200,”body”:{“connector.class”:”…

看到以下日志说明 file source connector 启动成功了

  1. tail -100f ~/logs/rocketmqconnect/connect_runtime.log

Start connector SftpSourceConnector and set target state STARTED successed!!

启动 SFTP sink connector

运行以下命令启动 SFTP sink connector,connector将会订阅RocketMQ Topic的数据进行消费, 并将每个消息转换为一行文字内容,然后通过SFTP协议写入到sink.txt文件中去。

  1. curl -X POST --location "http://localhost:8082/connectors/SftpSinkConnector" --http1.1 \
  2. -H "Host: localhost:8082" \
  3. -H "Content-Type: application/json" \
  4. -d '{
  5. "connector.class": "org.apache.rocketmq.connect.http.sink.SftpSinkConnector",
  6. "host": "127.0.0.1",
  7. "port": 22,
  8. "username": "YourUsername",
  9. "password": "yourPassword",
  10. "filePath": "/Users/YourUsername/rocketmqconnect/sftp-test/sink.txt",
  11. "connect.topicnames": "sftpTopic",
  12. "fieldSeparator": "|",
  13. "fieldSchema": "username|idCardNo|orderNo|orderAmount|trxDate|trxTime|profit"
  14. }'

curl请求返回status:200则表示创建成功,返回样例:

{“status”:200,”body”:{“connector.class”:”…

看到以下日志说明 file source connector 启动成功了

  1. tail -100f ~/logs/rocketmqconnect/connect_runtime.log

Start connector SftpSinkConnector and set target state STARTED successed!!

查看sink connector是否将数据写入了目的端文件:

  1. cat /Users/YourUsername/rocketmqconnect/sftp-test/sink.txt

如果生成了 sink.txt 文件,并且与 source.txt 内容一样则说明整个流程正常运行。

继续向源端文件 source.txt 中写入测试数据,

  1. cd /Users/YourUsername/rocketmqconnect/sftp-test/
  2. echo '张三x|100000202211290001|20221129001|30000.00|2022-11-28|03:00:00|7.00
  3. 李四x|100000202211290002|20221129002|40000.00|2022-11-28|04:00:00|9.00
  4. 赵五x|100000202211290003|20221129003|50000.00|2022-11-28|05:00:00|12.00' >> source.txt
  5. # Wait a few seconds, check if rocketmq-connect replicate data to sink file succeed
  6. sleep 10
  7. cat /Users/YourUsername/rocketmqconnect/sftp-test/sink.txt

注意:文件内容可能顺序不一样,这是因为rocketmq-connect-sftp向RocketMQ Topic中收发消息时,使用的消息类型是普通消息,区别于顺序消息,消费普通消息时是不保证顺序的。