1、首先安装kafka扩展
#安装librdkafka: 版本: https://github.com/edenhill/librdkafka/releases/tag/v0.9.2$ git clone https://github.com/edenhill/librdkafka.git$ ./configure$ make$ sudo make install#安装 rdkafka.so 版本:https://github.com/arnaud-lb/php-rdkafka/releases/tag/3.0.1$ git clone https://github.com/arnaud-lb/php-rdkafka.git$ cd php-rdkafka$ phpize$ ./configure$ make all -j 5$ sudo make install
2、生产者代码示例
$rcf = new RdKafka\Conf();
$rcf->set('group.id', 'test'); //topicname $cf = new RdKafka\TopicConf(); $cf->set('offset.store.method', 'broker'); $cf->set('auto.offset.reset', 'smallest'); $rk = new RdKafka\Producer($rcf); $rk->setLogLevel(LOG_DEBUG); $rk->addBrokers("127.0.0.1"); //brokeraddr $topic = $rk->newTopic("test", $cf); //topicname for($i = 0; $i < 10; $i++) { $topic->produce(0,0,'test' . $i); }3、消费者代码示例$rcf = new RdKafka\Conf();
$rcf->set('group.id', 'test'); $rcf->set('broker.version.fallback', '0.8.2'); //brokername,kafkaversion $cf = new RdKafka\TopicConf(); $cf->set('auto.offset.reset', 'smallest'); $cf->set('auto.commit.enable', true); $rk = new RdKafka\Consumer($rcf); $rk->setLogLevel(LOG_DEBUG); $rk->addBrokers("127.0.0.1"); //brokeraddr $topic = $rk->newTopic("test", $cf); //topicname,topicobject $topic->consumeStart(0,10); //partition,offset $msg = $topic->consume(0, 1000); //partition,timeout var_dump($msg);