以PHP视角探寻Kafka - 实现简易生产者功能

以PHP视角探寻Kafka - 实现简易生产者功能

代码先行

public function actionProducer()
{
  $conf = new \RdKafka\Conf();
  $conf->set('metadata.broker.list', 'broker地址');

  /**
  * kafka投递回调
  * RdKafka\Message Object
  * (
  * [err] => 0      //  等于0时说明投递成功
  * [topic_name] => topic_name    // 投递的topic名称
  * [timestamp] => 1637121418036    // 投递的时间
  * [partition] => 8            // 投递的分区
  * [payload] => Message 100    // 投递的消息
  * [len] => 11     // 消息的长度
  * [key] =>        // 每个消息的key 特殊用途(譬如通过key来跑不同的业务  key1创建 key2更改等等)
  * [offset] => 1   // 偏移量 
  * [headers] =>
  * [opaque] =>
  * )
  */
  $conf->setDrMsgCb(function ($kafka, $message) {
    if ($message->err) {
      echo '消息永久失败'."\n";
    } else {
      echo '消息发送成功'."\n";
    }
  });
  $producer = new \RdKafka\Producer($conf);
  $topic = $producer->newTopic("topic_name");
  for ($i = 0; $i < 10; $i++) {
    // RD_KAFKA_PARTITION_UA = -1
    $topic->produce(RD_KAFKA_PARTITION_UA, 0, "Message {$i}");
  }
  while ($producer->getOutQLen() > 0) {
    $producer->poll(1000);
  }

  for ($flushRetries = 0; $flushRetries < 10; $flushRetries++) {
    $result = $producer->flush(500);
    // RD_KAFKA_RESP_ERR_NO_ERROR = 0
    if (RD_KAFKA_RESP_ERR_NO_ERROR === $result) {
      break;
    }
  }

  if (RD_KAFKA_RESP_ERR_NO_ERROR !== $result) {
    throw new \RuntimeException('无法刷新, 消息可能会丢失!');
  }
}

代码分解

第一模块 - 配置

$conf = new \RdKafka\Conf();
$conf->set('metadata.broker.list', 'broker地址');
$conf->setDrMsgCb(function ($kafka, $message) {
  if ($message->err) {
    echo '消息永久失败'."\n";
  } else {
    echo '消息发送成功'."\n";
  }
});
  • 第一部分 new一个kafka配置的实例
  • 第二部分 设置kafka的broker地址 非必须 后续也可以用\Rdkafka\Producer::addBrokers()方法实现broker地址的添加
  • 第三部分 如果需要知道投递的结果,这个方法一定要实现,此方法是投递任务后的回调事件,==需要配合poll()方法使用==。返回值如下
RdKafka\Message Object
(
[err] => 0      //  等于0时说明投递成功
[topic_name] => topic_name    // 投递的topic名称
[timestamp] => 1637121418036    // 投递的时间
[partition] => 8            // 投递的分区
[payload] => Message 100    // 投递的消息
[len] => 11     // 消息的长度
[key] =>        // 每个消息的key 特殊用途(譬如通过key来跑不同的业务  key1创建 key2更改等等)
[offset] => 1   // 偏移量 
[headers] =>
[opaque] =>
)

第二模块 - 生成(可能会推送)

$producer = new \RdKafka\Producer($conf);
// $producer->addBrokers('broker地址');
$topic = $producer->newTopic("topic_name");
for ($i = 0; $i < 10; $i++) {
  // RD_KAFKA_PARTITION_UA = -1
  $topic->produce(RD_KAFKA_PARTITION_UA, 0, "Message {$i}");
}
  • 第一部分 new一个producer实例,参数是一个Conf对象,也就是【第一模块】里面的\Rdkafka\Conf()
  • 第二部分 注释掉的代码和【第一模块】中的【第二部分】的set方式二选一即可
  • 第三部分 创建一个topic

官方解释如下 中文为百度翻译

Creates a new topic instance for topic_name.

为主题名称创建新的主题实例

  • 第四部分 生成并投递数据 ==这个方法很重要==

官方解释如下 中文为百度翻译

Produce and send a single message

生产并发送一条消息

Note:
Since producing is asynchronous, you should call flush before you destroy the producer. Otherwise, any outstanding messages will be silently discarded.

由于生产是异步的,您应该在销毁生产者之前调用flush。否则,任何未完成的消息将被静默丢弃。

==如果代码到此结束的话,消息99%可能会丢失,因为生成是异步,可能会出现程序执行完了,消息还未投递的情况,所以可能会导致消息的丢失==

实验 将程序睡眠2秒钟,消息其实是可以推送的,但是如果消息特别多的话,睡眠2S可能推送不完,这种情况没有实验

$producer = new \RdKafka\Producer($conf);
// $producer->addBrokers('broker地址');
$topic = $producer->newTopic("topic_name");
for ($i = 0; $i < 10; $i++) {
  // RD_KAFKA_PARTITION_UA = -1
  $topic->produce(RD_KAFKA_PARTITION_UA, 0, "Message {$i}");
}
sleep(2);    // 程序休眠

第三模块 - 确认成功机制

while ($producer->getOutQLen() > 0) {
  $producer->poll(1000);
}

for ($flushRetries = 0; $flushRetries < 10; $flushRetries++) {
  $result = $producer->flush(500);
  // RD_KAFKA_RESP_ERR_NO_ERROR = 0
  if (RD_KAFKA_RESP_ERR_NO_ERROR === $result) {
    break;
  }
}
if (RD_KAFKA_RESP_ERR_NO_ERROR !== $result) {
  throw new \RuntimeException('无法刷新, 消息可能会丢失!');
}
  • 第一部分

    • while循环条件$producer->getOutQLen()

    官方解释如下 中文为百度翻译

    Polls for events, cause application provided callbacks to be called.

    事件轮询,导致应用程序提供的回调被调用。

    Note:

    An application using a sub-class of RdKafka should make sure to call poll() at regular intervals to serve any queued callbacks waiting to be called.

    使用RdKafka子类的应用程序应确保定期调用poll(),定期为等待调用的任何排队回调提供服务。

    Returns the number of messages in the out queue.

    返回输出队列中的消息数

    • 循环体 poll() 如果不执行这个方法,【第一模块】的回调事件将无法执行

    ==关于poll()的参数,个人理解是等待回调的阻塞时间,譬如1秒钟时间回调还没有执行成功则放弃回调,如有不对的还望指正==

    官方解释如下 中文为百度翻译

    Returns the number of events served.

    返回服务的事件数.

  • 第二部分

    • for循环次数,个人建议是通过$producer->getOutQLen()方法来决定次数
    • 循环体中plush()方法

    ==关于flush()方法,个人理解是:==这个方法相当于一个兜底的poll()方法,如果前面没有执行poll()方法,那么flush()方法也会帮我们去执行poll(),从而确保消息全部推送,返回值如果是0,则说明全部推送成功,反之消息可能会丢失

    官方解释如下 中文为百度翻译

    Wait until all outstanding produce requests, et.al, are completed. This should typically be done prior to destroying a producer instance to make sure all queued and in-flight produce requests are completed before terminating. This function will call poll() and thus trigger callbacks.

    等到所有未完成的产品请求等完成。这通常应该在销毁生产者实例之前完成,以确保在终止之前完成所有排队和进行中的生产请求。此函数将调用 poll() 并因此触发回调。

    In case of success returns RD_KAFKA_RESP_ERR_NO_ERROR, in case of timeout RD_KAFKA_RESP_ERR__TIMED_OUT and if not called on a producer instance RD_KAFKA_RESP_ERR__NOT_IMPLEMENTED.

    在成功的情况下返回RD_KAFKA_RESP_ERR_NO_ERROR,在超时的情况下,RD_KAFKA_RESP_ERR__TIMED_OUT如果没有在生产者实例上调用RD_KAFKA_RESP_ERR__NOT_IMPLEMENTED

以上是本人结合官方文档整理的一份最基础的生产者代码分析,如果不对还望大家指正!!!!!!

Rdkafka文档:arnaud.le-blanc.net/php-rdkafka-do...

本作品采用《CC 协议》,转载必须注明作者和本文链接
《L04 微信小程序从零到发布》
从小程序个人账户申请开始,带你一步步进行开发一个微信小程序,直到提交微信控制台上线发布。
《L03 构架 API 服务器》
你将学到如 RESTFul 设计风格、PostMan 的使用、OAuth 流程,JWT 概念及使用 和 API 开发相关的进阶知识。
讨论数量: 1

写的挺详细的,还有官方文档做支撑,整不赖!

2年前 评论

讨论应以学习和精进为目的。请勿发布不友善或者负能量的内容,与人为善,比聪明更重要!