宝塔下 php7.4 使用kafka

宝塔下 php7.4 使用kafka

先安装 rdkafka

GitHub – edenhill/librdkafka: The Apache Kafka C/C++ library

下载压缩包,丢到服务器上

cd 到压缩包目录,进行安装

cd librdkafka
./configure
make && make install

安装php-rdkafka扩展

wget https://github.com/arnaud-lb/php-rdkafka/archive/4.0.2.tar.gz
cd php-rdkafka-4.0.2/
/www/server/php/74/bin/phpize
./configure --with-php-config=/www/server/php/74/bin/php-config
make && make install

添加extension=rdkafka.so 到php.ini文件 放到文件最后一行即可

宝塔php7.4文件位置:

/www/server/php/74/etc/php.ini

/www/server/php/74/etc/php-cli.ini

然后重载一下php即可。

tp6的使用方法

composer安装kafka,命令:

composer require nmred/kafka-php

nmred/kafka-php – Packagist

代码:使用cli的方式调用消费者会502,调用消费者一般使用命令行方式

    //生产者
   public function index()
   {
        $config = \Kafka\ProducerConfig::getInstance();
        $config->setMetadataRefreshIntervalMs(10000);
        $config->setMetadataBrokerList('localhost:9092');
        $config->setBrokerVersion('1.0.0');
        $config->setRequiredAck(1);
        $config->setIsAsyn(false);
        $config->setProduceInterval(500);
        $producer = new \Kafka\Producer(
            function() {
                return [
                    [
                        'topic' => 'test',
                        'value' => 'test....message.',
                        'key' => 'testkey',
                    ],
                ];
            }
        );
        $producer->success(function($result) {
        	var_dump($result);
        });
        $producer->error(function($errorCode) {
        		var_dump($errorCode);
        });
        $producer->send(true);
   }
   //消费者
   public function xiaofei()
   {
        $config = \Kafka\ConsumerConfig::getInstance();
        $config->setMetadataRefreshIntervalMs(10000);
        $config->setMetadataBrokerList('localhost:9092');
        $config->setGroupId('test');
        $config->setBrokerVersion('1.0.0');
        $config->setTopics(['topicA']);
        //$config->setOffsetReset('earliest');
        $consumer = new \Kafka\Consumer();
        $consumer->start(function($topic, $part, $message) {
        	var_dump($message);
        });
   }

原生php调用:

也是使用composer包,命令:composer require nmred/kafka-php

目录结构:

宝塔下 php7.4 使用kafka

生产者文件:

<?php
require './vendor/autoload.php';
date_default_timezone_set('PRC');
$config = \Kafka\ProducerConfig::getInstance();
$config->setMetadataRefreshIntervalMs(10000);
$config->setMetadataBrokerList('localhost:9092');
$config->setBrokerVersion('1.0.0');
$config->setRequiredAck(1);
$config->setIsAsyn(false);
$config->setProduceInterval(500);
$producer = new \Kafka\Producer();
for($i = 0; $i < 10; $i++) {
    $result = $producer->send([
        [
            'topic' => 'topic',
            'value' => '今晚军训甄姬 '.$i.' 次',
            'key' => '',
        ],
    ]);
    var_dump($result);
}

消费者文件:

<?php
require './vendor/autoload.php';
date_default_timezone_set('PRC');
$config = \Kafka\ConsumerConfig::getInstance();
$config->setMetadataRefreshIntervalMs(10000);
$config->setMetadataBrokerList('localhost:9092');
$config->setGroupId('test');
$config->setBrokerVersion('1.0.0');
$config->setTopics(array('topic'));
//$config->setOffsetReset('earliest');
$consumer = new \Kafka\Consumer();
$consumer->start(function ($topic, $part, $message) {
    var_dump($message);
});

运行生产者:php producer.php

宝塔下 php7.4 使用kafka

运行 消费者:php demo/consumer.php

宝塔下 php7.4 使用kafka

                       

点击阅读全文

上一篇 2023年 5月 27日 am11:16
下一篇 2023年 5月 27日 am11:17