目录
- 前言
- 安装
- 1. 下载
- 2. 目录
- 使用
- 1. 生产
- 2. 消费(从指定的 partition 消费)
- 其他
- 解决方法
前言
由于之前在 PHP 中使用 Kafka 是通过 composer 包的方式,由于nmred/kafka-php 很久没有维护,并且网上相关问题的文章也比较少。所以我这次换成 PHP 扩展RdKafka 继续使用,主要介绍扩展安装和这种方式的基本操作。
安装
1. 下载
地址(找到与自己环境匹配的就可以)
2. 目录
由于 php-rdkafka 依赖 librdkafka,linux 就需要先安装 librdkafka 后安装 php-rdkafka,而 windows 版本是如下几个文件,安装方法如下:
(1). 将 librdkafka.dll 和 librdkafka.pdb 放入 PHP 安装的根目录下,而 php_rdkafka.dll 和 php_rdkafka.pdb 放入 PHP 安装目录的 ext 下。
(2). php.ini 配置文件添加 extension=php_rdkafka.dll,最后重启 PHP。
(3). php-m 或这 phpinfo (); 就可以查看到扩展了。
通过get_declared_classes() 也可以查看到扩展里预设的函数了。
使用
1. 生产
public function kafkaTest()
{
$rk = new \\RdKafka\\Producer();
$rk->addBrokers(\”127.0.0.1:9092\”);
$topic = $rk->newTopic(\”shop\”);
$ret = [];
for ($i = 0; $i < 5; $i++) {
$content = \”第\” . $i . \”次发送失败\”;
$message = [\”mobile\” => \”15623652142\”, \”content\” => $content];
$payload = json_encode($message);
// 指定向0号partition生产数据
$ret[][\’produce_res\’] = $topic->produce(0, 0, $payload, \”sms_$i\”);
// 随机选择partition
//$topic->produce(RD_KAFKA_PARTITION_UA, 0, $payload);
if ($rk->getOutQLen() > 0) {
$ret[][\’produce_poll\’] = $rk->poll(500);
} else {
$ret[][\’produce_poll\’] = $rk->poll(0);
}
}
dump($ret);
}
2. 消费(从指定的 partition 消费)
protected function execute(Input $input, Output $output)
{
$output->writeln(\”!!!hello kafka!!!\”);
$conf = new \\RdKafka\\Conf();
$conf->set(\’group.id\’, \’sms-consumer-group\’);
$rk = new \\RdKafka\\Consumer($conf);
$rk->addBrokers(\”127.0.0.1:9092\”);
$topicConf = new \\RdKafka\\TopicConf();
$topicConf->set(\’auto.commit.interval.ms\’, 100);
$topicConf->set(\’offset.store.method\’, \’file\’);
$topicConf->set(\’offset.store.path\’, sys_get_temp_dir());
$topicConf->set(\’auto.offset.reset\’, \’smallest\’);
$topic = $rk->newTopic(\”shop\”, $topicConf);
$topic->consumeStart(0, RD_KAFKA_OFFSET_STORED);
while(true) {
// 设置消费时的时间间隔,单位毫秒,以下表示5秒消费一个
$message = $topic->consume(0, 5000);
if ($message) {
echo \”读取到消息\\n\\r\”;
// 消息对象,包括消息主题,消息创建时间戳,消息分区编号,消息主体,消息键名,消息长度等
var_dump($message);
switch ($message->err) {
case RD_KAFKA_RESP_ERR_NO_ERROR:
echo \”读取消息成功:\\n\\r\”;
var_dump($message->payload);
break;
case RD_KAFKA_RESP_ERR__PARTITION_EOF:
echo \”读取消息失败\\n\\r\”;
break;
case RD_KAFKA_RESP_ERR__TIMED_OUT:
echo \”请求超时\\n\\r\”;
break;
default:
throw new \\Exception($message->errstr(), $message->err);
break;
}
} else {
echo \”未读取到消息\\n\\r\”;
}
}
$output->writeln(\”!!!the end!!!\”);
}
其他
在执行消费过程中,发现 kafka 停止服务,抛出的异常:ERROR Shutdown broker because all log dirs in /tmp/kafka-logs have failed。
解决方法
删除 kafka-logs 下的所有日志,再重新启动 Kafaka,kafka-server-start.bat ….\\config\\server.properties &
到此这篇关于利用扩展的方式在PHP中使用Kafka的教程分享的文章就介绍到这了,更多相关PHP使用Kafka内容请搜索悠久资源网以前的文章或继续浏览下面的相关文章希望大家以后多多支持悠久资源网!
您可能感兴趣的文章:
- php测试kafka项目示例
- PHP扩展之kafka安装应用案例详解