kafka

Kafka是一种高吞吐量的分布式发布订阅消息系统,有如下特性: 通过O(1)的磁盘数据结构提供消息的持久化,这种结构对于即使数以TB的消息存储也能够保持长时间的稳定性能。 高吞吐量:即使是非常普通的硬件Kafka也可以支持每秒数百万的消息。 支持通过Kafka服务器和消费机集群来分区消息。 支持Hadoop并行数据加载。

本项目代码参考自 https://github.com/weiboad/kafka-php

组件要求

  • php: >=7.1.0
  • ext-swoole: ^4.4.5
  • easyswoole/component: ^2.0 -easyswoole/spl: ^1.1

安装方法

composer require easyswoole/kafka

仓库地址

easyswoole/kafka

基本使用

注册kafka服务

  1. namespace EasySwoole\EasySwoole;
  2. use App\Producer\Process as ProducerProcess;
  3. use App\Consumer\Process as ConsumerProcess;
  4. use EasySwoole\EasySwoole\Swoole\EventRegister;
  5. use EasySwoole\EasySwoole\AbstractInterface\Event;
  6. use EasySwoole\Http\Request;
  7. use EasySwoole\Http\Response;
  8. class EasySwooleEvent implements Event
  9. {
  10. public static function initialize()
  11. {
  12. // TODO: Implement initialize() method.
  13. date_default_timezone_set('Asia/Shanghai');
  14. }
  15. public static function mainServerCreate(EventRegister $register)
  16. {
  17. // TODO: Implement mainServerCreate() method.
  18. // 生产者
  19. \EasySwoole\Component\Process\Manager::getInstance()->addProcess(new ProducerProcess());
  20. // 消费者
  21. \EasySwoole\Component\Process\Manager::getInstance()->addProcess(new ConsumerProcess());
  22. }
  23. }

生产者

  1. namespace App\Producer;
  2. use EasySwoole\Component\Process\AbstractProcess;
  3. use EasySwoole\Kafka\Config\ProducerConfig;
  4. use EasySwoole\Kafka\Kafka;
  5. class Process extends AbstractProcess
  6. {
  7. protected function run($arg)
  8. {
  9. go(function () {
  10. $config = new ProducerConfig();
  11. $config->setMetadataBrokerList('127.0.0.1:9092,127.0.0.1:9093');
  12. $config->setBrokerVersion('0.9.0');
  13. $config->setRequiredAck(1);
  14. $kafka = new Kafka($config);
  15. $result = $kafka->producer()->send([
  16. [
  17. 'topic' => 'test',
  18. 'value' => 'message--',
  19. 'key' => 'key--',
  20. ],
  21. ]);
  22. var_dump($result);
  23. var_dump('ok');
  24. });
  25. }
  26. }

消费者

  1. namespace App\Consumer;
  2. use EasySwoole\Component\Process\AbstractProcess;
  3. use EasySwoole\Kafka\Config\ConsumerConfig;
  4. use EasySwoole\Kafka\Kafka;
  5. class Process extends AbstractProcess
  6. {
  7. protected function run($arg)
  8. {
  9. go(function () {
  10. $config = new ConsumerConfig();
  11. $config->setRefreshIntervalMs(1000);
  12. $config->setMetadataBrokerList('127.0.0.1:9092,127.0.0.1:9093');
  13. $config->setBrokerVersion('0.9.0');
  14. $config->setGroupId('test');
  15. $config->setTopics(['test']);
  16. $config->setOffsetReset('earliest');
  17. $kafka = new Kafka($config);
  18. // 设置消费回调
  19. $func = function ($topic, $partition, $message) {
  20. var_dump($topic);
  21. var_dump($partition);
  22. var_dump($message);
  23. };
  24. $kafka->consumer()->subscribe($func);
  25. });
  26. }
  27. }

附赠

  1. Kafka 集群部署 docker-compose.yml 一份,使用方式如下
    1. 保证2181,9092,9093,9000端口未被占用(占用后可以修改compose文件中的端口号)
    2. 根目录下,docker-compose up -d
    3. 访问localhost:9000,可以查看kafka集群状态。

https://github.com/easy-swoole/kafka/blob/master/docker-compose.yml