环境版本

  • 操作系统:CentOS release 6.6 (Final)
  • java版本: jdk1.8
  • zookeeper版本: zookeeper-3.4.11
  • kafka 版本: kafka_2.11-1.1.1.tgz
  • canal.kafka 版本: 请下载最新的安装包,本文以当前v1.0.26 alpha 5 的canal.kafka-1.0.26-SNAPSHOT.tar.gz 为例
  • MySQL版本 :5.7.18
  • 注意 : 关闭所有机器的防火墙,同时注意启动可以相互telnet ip 端口

    一、 安装jdk

  1. 此处省略

二、 安装zookeeper

2.1 下载源码包,并解压

官网下载地址:http://www.apache.org/dyn/closer.cgi/zookeeper

  1. wget http://mirror.olnevhost.net/pub/apache/zookeeper/zookeeper-3.4.11/zookeeper-3.4.11.tar.gz
  2. tar zxvf zookeeper-3.4.11.tar.gz
  3. mv zookeeper-3.4.11 /usr/local/zookeeper

2.2 修改环境变量

编辑 /etc/profile 文件, 在文件末尾添加以下环境变量配置:

  1. # ZooKeeper Env
  2. export ZOOKEEPER_HOME=/usr/local/zookeeper
  3. export PATH=$PATH:$ZOOKEEPER_HOME/bin

运行以下命令使环境变量生效:source /etc/profile

2.3 重命名配置文件

初次使用 ZooKeeper 时,需要将$ZOOKEEPER_HOME/conf 目录下的 zoo_sample.cfg 重命名为 zoo.cfg, zoo.cfg

  1. mv $ZOOKEEPER_HOME/conf/zoo_sample.cfg $ZOOKEEPER_HOME/conf/zoo.cfg

2.4 单机模式—修改配置文件

创建目录/usr/local/zookeeper/data/usr/local/zookeeper/logs修改配置文件

  1. tickTime=2000
  2. initLimit=10
  3. syncLimit=5
  4. dataDir=/usr/local/zookeeper/data
  5. dataLogDir=/usr/local/zookeeper/logs
  6. clientPort=2181

如果是多节点,配置文件中尾部增加

  1. server.1=192.168.1.110:2888:3888
  2. server.2=192.168.1.111:2888:3888
  3. server.3=192.168.1.112:2888:3888

同时,增加

  1. #master
  2. echo "1">/usr/local/zookeeper/data/myid
  3. #slave1
  4. echo "2">/usr/local/zookeeper/data/myid
  5. #slave2
  6. echo "3">/usr/local/zookeeper/data/myid

2.5 启动 ZooKeeper 服务

  1. # cd /usr/local/zookeeper/zookeeper-3.4.11/bin
  2. # ./zkServer.sh start
  3. ZooKeeper JMX enabled by default
  4. Using config: /usr/local/zookeeper/zookeeper-3.4.11/bin/../conf/zoo.cfg
  5. Starting zookeeper ... STARTED
  6. zkServer.sh status
  7. ZooKeeper JMX enabled by default
  8. Using config: /usr/local/zookeeper/bin/../conf/zoo.cfg
  9. Mode: follower

2.6 验证zooKeeper服务

服务启动完成后,可以使用 telnet 和 stat 命令验证服务器启动是否正常:

  1. # telnet 127.0.0.1 2181
  2. Trying 127.0.0.1...
  3. Connected to 127.0.0.1.
  4. Escape character is '^]'.
  5. stat
  6. Zookeeper version: 3.4.11-37e277162d567b55a07d1755f0b31c32e93c01a0, built on 11/01/2017 18:06 GMT
  7. Clients:
  8. /127.0.0.1:48430[0](queued=0,recved=1,sent=0)
  9. Latency min/avg/max: 0/0/0
  10. Received: 1
  11. Sent: 0
  12. Connections: 1
  13. Outstanding: 0
  14. Zxid: 0x0
  15. Mode: standalone
  16. Node count: 4
  17. Connection closed by foreign host.

2.7 停止 ZooKeeper 服务

想要停止 ZooKeeper 服务, 可以使用如下命令:

  1. # cd /usr/local/zookeeper/zookeeper-3.4.11/bin
  2. # ./zkServer.sh stop
  3. ZooKeeper JMX enabled by default
  4. Using config: /usr/local/zookeeper/zookeeper-3.4.11/bin/../conf/zoo.cfg
  5. Stopping zookeeper ... STOPPED

三、zk ui安装 (选装,页面查看zk的数据)

拉取代码

  1. #git clone https://github.com/DeemOpen/zkui.git

源码编译需要安装 maven

  1. # wget http://repos.fedorapeople.org/repos/dchen/apache-maven/epel-apache-maven.repo -O /etc/yum.repos.d/epel-apache-maven.repo
  2. #cd zkui/
  3. #yum install -y maven
  4. #mvn clean install

修改配置文件默认值

  1. #vim config.cfg
  2. serverPort=9090 #指定端口
  3. zkServer=192.168.1.110:2181
  4. sessionTimeout=300000

启动程序至后台

2.0-SNAPSHOT 会随软件的更新版本不同而不同,执行时请查看target 目录中真正生成的版本

  1. nohup java -jar target/zkui-2.0-SNAPSHOT-jar-with-dependencies.jar &

用浏览器访问:

http://192.168.1.110:9090/

四、安装kafka

4.1 下载压缩包, 复制到固定目录并解压

到官网下载压缩包

  1. wget https://www.apache.org/dyn/closer.cgi?path=/kafka/1.1.1/kafka_2.11-1.1.1.tgz
  2. mkdir -p /usr/local/kafka
  3. cp kafka_2.11-1.1.1.tgz /usr/local/kafka
  4. tar -zxvf kafka_2.11-1.1.1.tgz

4.2 修改配置文件

vim /usr/local/kafka/kafka_2.11-1.1.1/config/server.properties 修改参数

  1. zookeeper.connect=192.168.1.110:2181
  2. listeners=PLAINTEXT://:9092
  3. advertised.listeners=PLAINTEXT://192.168.1.117:9092 #本机ip
  4. # ...

4.3 启动server bin/kafka-server-start.sh -daemon config/server.properties &

  • 查看所有topic

    1. # bin/kafka-topics.sh --list --zookeeper 192.168.1.110:2181
  • 查看指定topic 下面的数据

    1. # bin/kafka-console-consumer.sh ---bootstrap-server 192.168.1.117:9092 --from-beginning --topic example_t
    2. Using the ConsoleConsumer with old consumer is deprecated and will be removed in a future major release. Consider using the new consumer by passing [bootstrap-server] instead of [zookeeper].

4.7 参考链接

  1. https://kafka.apache.org/quickstart

五、 安装canal.kafka

5.1 下载压缩包

到官网下载最新压缩包,请下载 canal.kafka-latest.tar.gz。[下面以v1.0.26 alpha 5 的canal.kafka-1.0.26-SNAPSHOT.tar.gz 为例]

  1. https://github.com/alibaba/canal/releases

5.2 将canal.kafka 复制到固定目录并解压

  1. mkdir -p /usr/local/canal
  2. cp canal.kafka-1.0.26-SNAPSHOT.tar.gz /usr/local/canal
  3. tar -zxvf canal.kafka-1.0.26-SNAPSHOT.tar.gz

5.3 配置修改参数

修改instance 配置文件 vi conf/example/instance.properties

  1. # 按需修改成自己的数据库信息
  2. #################################################
  3. ...
  4. canal.instance.master.address=192.168.1.20:3306
  5. # username/password,数据库的用户名和密码
  6. ...
  7. canal.instance.dbUsername = canal
  8. canal.instance.dbPassword = canal
  9. ...
  10. #################################################

对应ip 地址的MySQL 数据库需进行相关初始化与设置详情参考 QuickStart : https://github.com/alibaba/canal/wiki/QuickStart

修改canal 配置文件vim /usr/local/canal/conf/canal.properties

  1. # ...
  2. canal_ip =192.168.1.99
  3. canal.withoutNetty =**true**
  4. # zk server地址
  5. canal.zkServers=192.168.1.110:2181,192.168.1.111:2181,192.168.1.112:2181
  6. canal.destinations= example
  7. # ...
  8. canal.instance.global.spring.xml = classpath:spring/default-instance.xml

详细请参考 AdminGuide : https://github.com/alibaba/canal/wiki/AdminGuide

修改canal.kafka 配置文件vim /usr/local/canal/conf/kafka.yml

  1. #kafka 集群用,进行分隔: 192.168.1.117:9092,192.168.1.118:9092,192.168.1.119:9092
  2. servers: 192.168.1.117:9092
  3. retries: 0
  4. batchSize: 16384
  5. lingerMs: 1
  6. bufferMemory: 33554432
  7. # canal的批次大小,单位 k
  8. canalBatchSize: 50
  9. filterTransactionEntry: true
  10. canalDestinations:
  11. - canalDestination: example
  12. topic: example
  13. partition:
  14. # 一个destination可以对应多个topic
  15. # topics:
  16. # - topic: example
  17. # partition:

参数说明

  1. servers: 对应 kafka `ProducerConfig.BOOTSTRAP_SERVERS_CONFIG`
  2. retries: 对应kafka`ProducerConfig.RETRIES_CONFIG`
  3. batchSize: 对应kafka`ProducerConfig.BATCH_SIZE_CONFIG`
  4. lingerMs: 对应kafka`ProducerConfig.LINGER_MS_CONFIG`
  5. bufferMemory: 对应kafka`ProducerConfig.BUFFER_MEMORY_CONFIG`
  6. filterTransactionEntry: 过滤事务头、尾

5.4 启动

  1. cd /usr/local/canal/
  2. sh bin/startup.sh

5.5 查看日志

a.查看 logs/canal/canal.log

  1. vi logs/canal/canal.log

b. 查看instance的日志:

  1. vi logs/example/example.log

5.6 关闭

  1. cd /usr/local/canal/
  2. sh bin/stop.sh

5.7 使用canal.kafka.client 查看kafka topic中的代码样例

  1. # // 可参考 canal kafka client 中的实现
  2. com.alibaba.otter.canal.kafka.client.running.CanalKafkaClientExample

5.8 参考链接

  1. https://github.com/alibaba/canal/wiki/QuickStart
  2. https://github.com/alibaba/canal/wiki/AdminGuide

原文: https://github.com/alibaba/canal/wiki/Canal-Kafka-QuickStart