kafka
本例子以https://github.com/weiboad/kafka-php作为客户端。使用composer安装时请先看EasySwoole文档中自动加载的章节,
为EasySwoole引入composer。
如何在EasySwoole中添加自定义阻塞进程
EasySwoole支持在beforeWorker事件中添加自定义进程参与swoole底层的事件循环,具体实例代码为:
$server->addProcess(new \swoole_process(function (){
while(1){
}
}));
kafka消费者
$server->addProcess(new \swoole_process(function (){
$config = \Kafka\ConsumerConfig::getInstance();
$config->setMetadataRefreshIntervalMs(10000);
$config->setMetadataBrokerList('0.0.0.0:9092');
$config->setGroupId('test');
$config->setBrokerVersion('0.9.0.1');
$config->setTopics(array('test'));
$consumer = new \Kafka\Consumer();
$consumer->start(function($topic, $part, $message) {
Logger::getInstance()->log($message);
});
}));
我们向该topic发生一个消息,可见
array(3) {
["offset"]=>
int(0)
["size"]=>
int(19)
["message"]=>
array(6) {
["crc"]=>
int(2275900082)
["magic"]=>
int(0)
["attr"]=>
int(0)
["timestamp"]=>
int(0)
["key"]=>
string(0) ""
["value"]=>
string(5) "hello"
}
}
以上例子仅为示例,具体使用请见kafka-php文档