MySQL 数据同步到 Elasticsearch
要使用 PHP 将 Canal 捕获的 MySQL 数据同步到 Elasticsearch,我们可以按照以下步骤,创建一个完整的系统来实现这个目标。这个流程包括以下步骤:
- 配置 Canal 来捕获 MySQL 数据的变更(通过 binlog)。
- PHP 消费 Kafka 消息,并将这些消息同步到 Elasticsearch。
以下是实现步骤:
1. 配置 Canal 捕获 MySQL 数据变化
假设你已经完成了 Canal 的安装和配置。Canal 将 MySQL 的增量数据推送到 Kafka。具体步骤可以参考上面的教程,确保 Canal 成功地捕获 MySQL 的 binlog 数据并将其推送到 Kafka。
2. 配置 Kafka
在 Canal 中,我们需要确保 Kafka 的适配器已正确配置,Canal 将数据推送到 Kafka,Kafka 消息格式将是 MySQL 数据的变更记录(如:插入、更新或删除)。
Canal 配置示例
在 canal.properties
中进行如下配置:
# Canal 启用 Kafka 适配器
canal.instance.adapter.kafka.enable=true
canal.instance.adapter.kafka.bootstrap.servers=127.0.0.1:9092 # Kafka broker 地址
canal.instance.adapter.kafka.topic=mydb-mytable # Kafka 主题
canal.instance.adapter.kafka.key.serializer=org.apache.kafka.common.serialization.StringSerializer
canal.instance.adapter.kafka.value.serializer=org.apache.kafka.common.serialization.StringSerializer
3. 安装 PHP 依赖
你需要安装一些 PHP 库来实现 Kafka 消费和 Elasticsearch 写入。
1. 安装 Kafka 消费者库 (php-rdkafka
)
首先需要安装 php-rdkafka
扩展,PHP 才能与 Kafka 进行交互。
# 安装 librdkafka(Kafka 的 C 客户端)
sudo apt-get install librdkafka-dev
# 安装 PHP 扩展
pecl install rdkafka
# 安装 Composer 依赖
composer require edenhill/php-rdkafka
2. 安装 Elasticsearch PHP 客户端
composer require elasticsearch/elasticsearch
4. 编写 PHP 脚本:消费 Kafka 消息并同步到 Elasticsearch
在这个 PHP 脚本中,我们将创建 Kafka 消费者,消费从 Canal 推送到 Kafka 的消息,并将它们同步到 Elasticsearch 中。
<?php
require 'vendor/autoload.php'; // 引入 Composer 自动加载文件
use RdKafka\Consumer;
use RdKafka\ConsumerTopic;
use Elasticsearch\ClientBuilder;
// Kafka 配置
$kafkaBroker = '127.0.0.1:9092'; // Kafka broker 地址
$kafkaTopic = 'mydb-mytable'; // Canal 推送到 Kafka 的主题
// 创建 Kafka 消费者
$consumer = new Consumer();
$consumer->addBrokers($kafkaBroker);
// 订阅 Kafka 主题
$topic = $consumer->newTopic($kafkaTopic);
$topic->consumeStart(0, RD_KAFKA_OFFSET_END); // 从最新的 offset 开始消费
// 创建 Elasticsearch 客户端
$client = ClientBuilder::create()->setHosts(['127.0.0.1:9200'])->build();
// 开始消费 Kafka 消息
while (true) {
$message = $topic->consume(0, 1000); // 获取 Kafka 消息(阻塞模式)
if ($message->err) {
echo "Error: {$message->errstr()}\n";
continue;
}
// Kafka 消息体是 JSON 格式的 MySQL 数据变更记录
$record = json_decode($message->payload, true);
// 解析消息,假设 record 中包含了一个 'id' 和 'data' 字段
// 这些字段会根据你数据库表的字段来进行调整
if ($record) {
// 假设 record 包含 MySQL 数据的 ID 和变更数据
$index = 'myindex'; // Elasticsearch 索引
$type = '_doc'; // Elasticsearch 类型(如果不使用类型,写 null)
$id = $record['id']; // 数据的 ID(如主键)
$data = $record['data']; // 数据内容
// 向 Elasticsearch 写入数据
try {
$params = [
'index' => $index,
'id' => $id,
'body' => $data
];
$response = $client->index($params); // 插入/更新数据
echo "Data inserted into Elasticsearch: " . json_encode($response) . "\n";
} catch (Exception $e) {
echo "Error inserting data into Elasticsearch: " . $e->getMessage() . "\n";
}
}
}
?>
5. 解释 PHP 脚本的工作原理
1. Kafka 消费者配置
- 使用
php-rdkafka
创建 Kafka 消费者,配置 Kafka 的bootstrap.servers
和消费的主题mydb-mytable
。 - 通过
consumeStart
方法设置从 Kafka 主题中获取消息,并从最新的偏移量开始消费。
2. 从 Kafka 获取消息
- 使用
consume(0, 1000)
方法从 Kafka 拉取消息,这里的1000
是拉取的超时设置(以毫秒为单位)。你可以根据需要调整这个值。
3. 处理 Kafka 消息并将数据写入 Elasticsearch
- 将 Kafka 消息解码为 JSON 格式(
json_decode
),并获取其中的数据(例如主键id
和数据内容data
)。 - 将这些数据通过 Elasticsearch PHP 客户端插入到指定的索引中。
4. Elasticsearch 插入
- 使用
index
方法向 Elasticsearch 插入或更新数据。如果 Elasticsearch 中已存在相同的id
,它将更新该文档。
6. 运行 PHP 脚本
使用 PHP CLI 来运行这个脚本,并使其持续运行,从 Kafka 消费数据并将其同步到 Elasticsearch。
php consume_kafka_to_es.php
这个脚本将持续运行并消费 Kafka 中的消息,确保数据实时同步到 Elasticsearch。
7. 总结
通过上述步骤,你可以实现使用 PHP 将 Canal 捕获的 MySQL 数据实时同步到 Elasticsearch。整个流程如下:
- Canal 捕获 MySQL 的增量数据,并将其推送到 Kafka。
- PHP 使用 php-rdkafka 库消费 Kafka 消息。
- 使用 Elasticsearch PHP 客户端 将变更数据同步到 Elasticsearch。
这种方法可以保证 MySQL 数据和 Elasticsearch 之间的实时同步,适合大多数实时数据同步的场景。如果需要更复杂的错误处理或数据处理逻辑,可以在此基础上扩展。
本作品采用《CC 协议》,转载必须注明作者和本文链接
老生常谈的话题。 cdc的工具可以关注最近flink新出了个框架 flink-cdc正好支持es的sink, 不用写任何代码丢进去就能用