kafka 测试遇到掉数据的问题 nmred/kafka-PHP

kafka-php 测试遇到掉数据的问题

遇到的现象

现象1.当消费者进程未启动时 生产者进行生产一条数据   待消费Lag为1 LOG-END-OFFSET-CURRENT-OFFSET=1
|GROUP|TOPIC|PARTITION|CURRENT-OFFSET|LOG-END-OFFSET|LAG| CONSUMER-ID|HOST|CLIENT-ID|
|--|--|--|--|--|--|--|--|--|
|test888|test888|0|25|26| 1| kafka-php-2078a3fb-17de-487a-bb09-290d3e4b124c|/127.0.0.1| kafka-php|

现象2. 启动生产者 不见消费数据的打印  但是CURRENT-OFFSET=LOG-END-OFFSET  显示已经消费了
现象3. 如果消费者进程先开启  再进行生产  不会出现2的情况   一切进展正常 

生产者

$topic = "test888";
        $key = '';
        // 创建kafka数据
        $config = ProducerConfig::getInstance();
        $config->setMetadataRefreshIntervalMs(10000);
        $config->setMetadataBrokerList('127.0.0.1:9092');
        $config->setBrokerVersion('1.0.0');

        /**
         * 生产者生产消息是否需要等待broker
         * 0 表示producer不需要等待来着broker同步完成的确认继续发送下一条消息 拥有最低的延迟性   但是数据可能会掉
         * 1 表示producer在leader成功接收消息且在成功确认后发送下一条message
         * -1 表示producer在follower副本确认接收到消息后 才算发送完成.
         */
        $config->setRequiredAck(-1); // 是否需要ack回报.
        $config->setIsAsyn(true); // 是否异步.
        $config->setProduceInterval(500);

        $producer = new Producer();

//        for($i=1;$i<=100;$i++) {
            $result = $producer->send([
                [
                    'topic' => $topic,
                    'value' =>json_encode(array('time' => date('Y-m-d H:i:s'))),
                    'key' => '',
                ],
            ]);
            echo "生产消息推入test队列:\n";
            var_dump($result);
//        }

消费者

 // 该主题对应的分区是多少个  就可以指定<=分区数的消费者队列个数.  使用supervisor管理
        $groupId = 'test888';
        $topic = ["test888"];
        // kafka消费者
        $config = ConsumerConfig::getInstance();
        $config->setMetadataRefreshIntervalMs(10000);
        $config->setMetadataBrokerList('127.0.0.1:9092');
        $config->setGroupId($groupId);
        $config->setBrokerVersion('1.0.0');
        $config->setTopics($topic);
        $config->setOffsetReset('earliest');
        var_dump($config->getOffsetReset());

        $consumer = new Consumer();
        $consumer->start(function ($topic,$part,$message) {
            echo "执行消费:".date('Y-m-d H:i:s')."\n";
            var_dump($message);
        })
本作品采用《CC 协议》,转载必须注明作者和本文链接
《L05 电商实战》
从零开发一个电商项目,功能包括电商后台、商品 & SKU 管理、购物车、订单管理、支付宝支付、微信支付、订单退款流程、优惠券等
《L04 微信小程序从零到发布》
从小程序个人账户申请开始,带你一步步进行开发一个微信小程序,直到提交微信控制台上线发布。
讨论数量: 2

这个问题确实存在 建议rdkafka

4年前 评论

这个后续是怎么解决的@

9个月前 评论

讨论应以学习和精进为目的。请勿发布不友善或者负能量的内容,与人为善,比聪明更重要!