MySQL 数据同步到 Elasticsearch

要使用 PHPCanal 捕获的 MySQL 数据同步到 Elasticsearch,我们可以按照以下步骤,创建一个完整的系统来实现这个目标。这个流程包括以下步骤:

  1. 配置 Canal 来捕获 MySQL 数据的变更(通过 binlog)。
  2. PHP 消费 Kafka 消息,并将这些消息同步到 Elasticsearch。

以下是实现步骤:

1. 配置 Canal 捕获 MySQL 数据变化

假设你已经完成了 Canal 的安装和配置。Canal 将 MySQL 的增量数据推送到 Kafka。具体步骤可以参考上面的教程,确保 Canal 成功地捕获 MySQL 的 binlog 数据并将其推送到 Kafka。

2. 配置 Kafka

在 Canal 中,我们需要确保 Kafka 的适配器已正确配置,Canal 将数据推送到 Kafka,Kafka 消息格式将是 MySQL 数据的变更记录(如:插入、更新或删除)。

Canal 配置示例

canal.properties 中进行如下配置:

# Canal 启用 Kafka 适配器
canal.instance.adapter.kafka.enable=true
canal.instance.adapter.kafka.bootstrap.servers=127.0.0.1:9092  # Kafka broker 地址
canal.instance.adapter.kafka.topic=mydb-mytable                  # Kafka 主题
canal.instance.adapter.kafka.key.serializer=org.apache.kafka.common.serialization.StringSerializer
canal.instance.adapter.kafka.value.serializer=org.apache.kafka.common.serialization.StringSerializer

3. 安装 PHP 依赖

你需要安装一些 PHP 库来实现 Kafka 消费和 Elasticsearch 写入。

1. 安装 Kafka 消费者库 (php-rdkafka)

首先需要安装 php-rdkafka 扩展,PHP 才能与 Kafka 进行交互。

# 安装 librdkafka(Kafka 的 C 客户端)
sudo apt-get install librdkafka-dev

# 安装 PHP 扩展
pecl install rdkafka

# 安装 Composer 依赖
composer require edenhill/php-rdkafka

2. 安装 Elasticsearch PHP 客户端

composer require elasticsearch/elasticsearch

4. 编写 PHP 脚本:消费 Kafka 消息并同步到 Elasticsearch

在这个 PHP 脚本中,我们将创建 Kafka 消费者,消费从 Canal 推送到 Kafka 的消息,并将它们同步到 Elasticsearch 中。

<?php
require 'vendor/autoload.php'; // 引入 Composer 自动加载文件

use RdKafka\Consumer;
use RdKafka\ConsumerTopic;
use Elasticsearch\ClientBuilder;

// Kafka 配置
$kafkaBroker = '127.0.0.1:9092';  // Kafka broker 地址
$kafkaTopic = 'mydb-mytable';      // Canal 推送到 Kafka 的主题

// 创建 Kafka 消费者
$consumer = new Consumer();
$consumer->addBrokers($kafkaBroker);

// 订阅 Kafka 主题
$topic = $consumer->newTopic($kafkaTopic);
$topic->consumeStart(0, RD_KAFKA_OFFSET_END); // 从最新的 offset 开始消费

// 创建 Elasticsearch 客户端
$client = ClientBuilder::create()->setHosts(['127.0.0.1:9200'])->build();

// 开始消费 Kafka 消息
while (true) {
    $message = $topic->consume(0, 1000); // 获取 Kafka 消息(阻塞模式)

    if ($message->err) {
        echo "Error: {$message->errstr()}\n";
        continue;
    }

    // Kafka 消息体是 JSON 格式的 MySQL 数据变更记录
    $record = json_decode($message->payload, true);

    // 解析消息,假设 record 中包含了一个 'id' 和 'data' 字段
    // 这些字段会根据你数据库表的字段来进行调整
    if ($record) {
        // 假设 record 包含 MySQL 数据的 ID 和变更数据
        $index = 'myindex';   // Elasticsearch 索引
        $type = '_doc';        // Elasticsearch 类型(如果不使用类型,写 null)
        $id = $record['id'];   // 数据的 ID(如主键)
        $data = $record['data']; // 数据内容

        // 向 Elasticsearch 写入数据
        try {
            $params = [
                'index' => $index,
                'id' => $id,
                'body' => $data
            ];
            $response = $client->index($params);  // 插入/更新数据
            echo "Data inserted into Elasticsearch: " . json_encode($response) . "\n";
        } catch (Exception $e) {
            echo "Error inserting data into Elasticsearch: " . $e->getMessage() . "\n";
        }
    }
}
?>

5. 解释 PHP 脚本的工作原理

1. Kafka 消费者配置

  • 使用 php-rdkafka 创建 Kafka 消费者,配置 Kafka 的 bootstrap.servers 和消费的主题 mydb-mytable
  • 通过 consumeStart 方法设置从 Kafka 主题中获取消息,并从最新的偏移量开始消费。

2. 从 Kafka 获取消息

  • 使用 consume(0, 1000) 方法从 Kafka 拉取消息,这里的 1000 是拉取的超时设置(以毫秒为单位)。你可以根据需要调整这个值。

3. 处理 Kafka 消息并将数据写入 Elasticsearch

  • 将 Kafka 消息解码为 JSON 格式(json_decode),并获取其中的数据(例如主键 id 和数据内容 data)。
  • 将这些数据通过 Elasticsearch PHP 客户端插入到指定的索引中。

4. Elasticsearch 插入

  • 使用 index 方法向 Elasticsearch 插入或更新数据。如果 Elasticsearch 中已存在相同的 id,它将更新该文档。

6. 运行 PHP 脚本

使用 PHP CLI 来运行这个脚本,并使其持续运行,从 Kafka 消费数据并将其同步到 Elasticsearch。

php consume_kafka_to_es.php

这个脚本将持续运行并消费 Kafka 中的消息,确保数据实时同步到 Elasticsearch。

7. 总结

通过上述步骤,你可以实现使用 PHPCanal 捕获的 MySQL 数据实时同步到 Elasticsearch。整个流程如下:

  1. Canal 捕获 MySQL 的增量数据,并将其推送到 Kafka
  2. PHP 使用 php-rdkafka 库消费 Kafka 消息。
  3. 使用 Elasticsearch PHP 客户端 将变更数据同步到 Elasticsearch。

这种方法可以保证 MySQL 数据和 Elasticsearch 之间的实时同步,适合大多数实时数据同步的场景。如果需要更复杂的错误处理或数据处理逻辑,可以在此基础上扩展。

本作品采用《CC 协议》,转载必须注明作者和本文链接
《L04 微信小程序从零到发布》
从小程序个人账户申请开始,带你一步步进行开发一个微信小程序,直到提交微信控制台上线发布。
《L03 构架 API 服务器》
你将学到如 RESTFul 设计风格、PostMan 的使用、OAuth 流程,JWT 概念及使用 和 API 开发相关的进阶知识。
讨论数量: 2

老生常谈的话题。 cdc的工具可以关注最近flink新出了个框架 flink-cdc正好支持es的sink, 不用写任何代码丢进去就能用

file

2个月前 评论
晏南风 2个月前

讨论应以学习和精进为目的。请勿发布不友善或者负能量的内容,与人为善,比聪明更重要!