Laravel 实现 Kafka 消息推送与接收处理

安装环境要求
PHP 版本大于 7.0
Kafka Server 版本大于 0.8.0
消费模块 Kafka Server 版本需要大于 0.9.0

安装
使用 Composer 安装
添加 composer 依赖 nmred/kafka-php 到项目的 composer.json 文件中即可,如:

 "require": {
        "php": ">=5.6.4",
        "laravel/lumen-framework": "5.4.*",
        "nmred/kafka-php": "dev-master"
    },

创建 KafkaService

<?php
namespace App\Http\Services;
use Kafka;

class KafkaService 
{
    public function __construct()
    {
        date_default_timezone_set('PRC');
    }

    /*
     * Produce
     */
    public function Producer($topic, $value , $url)
    {
        $config = \Kafka\ProducerConfig::getInstance();
        $config->setMetadataRefreshIntervalMs(10000);
        $config->setMetadataBrokerList($url);
        $config->setBrokerVersion('1.0.0');
        $config->setRequiredAck(1);
        $config->setIsAsyn(false);
        $config->setProduceInterval(500);
        $producer = new \Kafka\Producer(function () use($value,$topic){
            return [
                [
                    'topic' => $topic,
                    'value' => $value,
                    'key' => '',
                ],
            ];
        });
        $producer->success(function ($result){
            return "success";
        });
        $producer->error(function ($errorCode){
            var_dump($errorCode);
        });
        $producer->send(true);
    }

    /*
     * Consumer
     */
    public function consumer($group,$topics , $url){
        $config = \Kafka\ConsumerConfig::getInstance();
        $config->setMetadataRefreshIntervalMs(500);
        $config->setMetadataBrokerList($url);
        $config->setGroupId($group);
        $config->setBrokerVersion('1.0.0');
        $config->setTopics([$topics]);
        $config->setOffsetReset('earliest');
        $consumer = new \Kafka\Consumer();
        $consumer->start(function($topic, $part, $message) {
            echo "receive a message...\n";
            app('consumerKafka')->consumerData($message['message']['value']);  //你的接收处理逻辑
            var_dump($message['message']['value']);
        });
    }
}

执行produce方法生产消息

<?php
namespace App\Http\Services;
use App\Http\Services\KafkaService;
class ProduceService
{
 public function produce()
    {
        $topic = env('topic_test'); //配置在env中
        $url = env('kafka_url_test'); //配置在env中
        $value =
            [
                'code' => 'test',
                'data_type' => 'personal',
                'action' => 'update',
                'data' =>
                    [
                        'id' => 1,
                        'name' => 'tom',
                        'gender' => 2
                    ],
                'redirect_url' => '',
                'operator' => 'system',
            ];
        $value = json_encode ($value, JSON_FORCE_OBJECT );
        $kafka = new KafkaService();
        $kafka->Producer($topic, $value , $url);
    }

}

执行php artisan consumer:kafka 消费消息

<?php
namespace App\Console\Commands;
use Illuminate\Console\Command;
use Illuminate\Support\Facades\Redis;

class ConsumerKafka extends Command
{
    /**
     * The name and signature of the console command.
     *
     * @var string
     */
    protected $signature = 'consumer:kafka';

    /**
     * The console command description.
     *
     * @var string
     */
    protected $description = '处理异步kafka消息';

    /**
     * Create a new command instance.
     */
    public function __construct()
    {
        parent::__construct();
    }
    /**
     * Execute the console command.
     *
     * @return mixed
     */
    public function handle()
    {
        $this->log('开始监听消息...');
        app('kafkaService')->consumer($group=env('KAFKA_GROUP'),$topics =env('KAFKA_TOPIC'), $url=env('KAFKA_URL'));
        return $this;
    }

    private function log($msg = '')
    {
        if (!$msg) {
            return $this;
        }
        if (php_sapi_name() == 'cli') {
            echo $msg, PHP_EOL;
        }
        app('myLog')->lumenLog($msg, 'kafka_consumer');
        return $this;
    }
}
本作品采用《CC 协议》,转载必须注明作者和本文链接
本帖由系统于 5年前 自动加精
《L04 微信小程序从零到发布》
从小程序个人账户申请开始,带你一步步进行开发一个微信小程序,直到提交微信控制台上线发布。
《L02 从零构建论坛系统》
以构建论坛项目 LaraBBS 为线索,展开对 Laravel 框架的全面学习。应用程序架构思路贴近 Laravel 框架的设计哲学。
讨论数量: 15
newbing

不知道是否可以适配一个Laravel Queue 的 driver 出来。另外 Kafka 2.0 发布了。

5年前 评论

我按照你的方式,执行 返回 1000 , 但是服务器没有打印出消息啊

5年前 评论
kafkaasaxa 4年前
ashishnimrot

Class kafkaService does not exist {"exception":"[object] (ReflectionException(code: -1): Class kafkaService does not exist at vendor/laravel/framework/src/Illuminate/Container/Container.php:752)

5年前 评论

请问,消费者怎么主动设置offset呢

4年前 评论

@ashishnimrot 请问这个问题解决了吗?我也遇到这个问题

4年前 评论

@jamestan 换个写法 直接new这个类调用方法就可以了

4年前 评论
chengxuyuan_liaoli

这个代码没有出现两次请求吗

4年前 评论
chengxuyuan_liaoli (作者) 4年前
chengxuyuan_liaoli

消费者这两个参数是啥 $topic, $part

4年前 评论

我也遇到了“输出1000”的问题, 最后排除到原因是kafka-php这个库没有详细报出具体错误信息,使用debug模式就可以看到详细的日志输出了。
以下是我安装rdkafka定位到问题的

  1. 安装C写的rdkafka扩展

    安装librdkafka 库
    git clone https://github.com/edenhill/librdkafka.git
    ./configure
    make
    sudo make install
    安装rdkafka
    sudo pecl install rdkafka
    配置扩展到php.ini
    extension=rdkafka.so
  2. 写代码测试

    $conf = new \RdKafka\Conf();
        // enable debug mode
        $conf->set('log_level', LOG_DEBUG);
        $conf->set('debug', 'all');
        $rk = new \RdKafka\Producer($conf);
        $rk->addBrokers("192.168.5.170:9092,192.168.5.99:9092");
    
        $topic = $rk->newTopic("php_topic");
    
        $topic->produce(RD_KAFKA_PARTITION_UA, 0, "Message payload");
    
        $producer = new \RdKafka\Producer();
        $producer->addBrokers("192.168.5.170:9092,192.168.5.99:9092");
    
        $obj_topic = $producer->newTopic("php_topic");
    
        //
        $input_handler = fopen('php://stdin', 'r');
    
        while (true) {
            echo "\nPlease input  messages:\n";
            $payload = trim(fgets($input_handler));
    
            // empty message will be quit
            if (empty($payload)) {
                break;
            }
    
            // send message
            $obj_topic->produce(RD_KAFKA_PARTITION_UA, 0, $payload);
        }
    
        echo "done\n";
        exit(0);
  3. 检查输出日志发现无法解析服务器的错误,后来才发现是公司运维配置了本地host映射。在自己服务器上/etc/hosts配置好映射就好了。
    %3|1571918912.221|FAIL|rdkafka#producer-1| [thrd:server5:9092/5]: server5:9092/5: Failed to resolve 'server5:9092': Name or service not known (after 3ms in state CONNECT)

  4. 这个时候去用kafka-php库也能正常工作了。

以上是我踩坑排除问题的思路,希望能帮助到大家:)

4年前 评论

我创建topic 1000什么原因 求解

3年前 评论
mar_he

我使用的是kafka-php的包,你们有没有遇到不能主动创建主题的问题。如果生产者使用的主题,没有在zookeeper中注册的话,生产者会报1000的错误。如果使用已创建的主题就不会有问题。

3年前 评论

1000的问题怎么解决

2年前 评论

讨论应以学习和精进为目的。请勿发布不友善或者负能量的内容,与人为善,比聪明更重要!