pulsar sink clickhouse
pulsar sink clickhouse
- 目标 pulsar topic 数据 sink 到clickhouse
创建下沉数据库
create table pulsar_clickhouse_jdbc_sink
(
userId UInt64,
score UInt8,
EventDate Date
)
engine = MergeTree()
PARTITION BY toYYYYMM(EventDate)
ORDER BY (EventDate,intHash32(userId))
SAMPLE BY intHash32(userId);
创建 clickhouse sink config 配置文件
configs:
userName: "default"
password: ""
jdbcUrl: "jdbc:clickhouse://localhost:8123/pulsar_clickhouse_jdbc_sink"
tableName: "pulsar_clickhouse_jdbc_sink"
在 pulsar 上创建 sink connector
本地启动 clickhouse sink
./pulsar-admin sinks localrun \
--archive /root/apache-pulsar-2.7.0/connectors/pulsar-io-jdbc-clickhouse-2.7.0.nar \
--tenant public \
--namespace default \
--name sink-test-clickhouse \
--inputs test_clickhouse \
--sink-config-file /root/apache-pulsar-2.7.0/bin/clickhouse.yaml
如果找不到 sink type name需要去 archive.apache.org/dist/pulsar/pul... 下载相应的 connector 安装到 pulsar 根目录下的 connectors 目录下面
开启Schema 自动注册
定义pojo
./pulsar-admin namespaces set-is-allow-auto-update-schema --enable public/default
pulsar 发布消息到topic
package com.example.demo.utils;
import lombok.Data;
import java.io.Serializable;
import java.util.Date ;
@Data
public class UserEvent implements Serializable {
Long userId;
Integer score;
Date EventDate;
}
发送数据到 topic 查看是否sink到 clickhouse
PulsarClient client = PulsarClient.builder()
.serviceUrl("pulsar://localhost:6650")
.build();
var ch= JSONSchema.of(UserEvent.class);
System.out.println("定义");
System.out.println(ch.getSchemaInfo().getSchemaDefinition());
Producer<UserEvent> producer = client.newProducer(ch)
.enableBatching(true)
.topic("test_clickhouse")
.create();
UserEvent ue=new UserEvent();
ue.setUserId(1L);
ue.setScore(20);
ue.setEventDate(Calendar.getInstance().getTime());
producer.send(ue);
select * from pulsar_clickhouse_jdbc_sink.pulsar_clickhouse_jdbc_sink;
SELECT *
FROM pulsar_clickhouse_jdbc_sink.pulsar_clickhouse_jdbc_sink
Query id: 635f4ae8-00c0-401c-914d-ab47ccc5a718
┌─userId─┬─score─┬──EventDate─┐
│ 1 │ 30 │ 1970-01-01 │
└────────┴───────┴────────────┘
┌─userId─┬─score─┬──EventDate─┐
│ 1 │ 20 │ 1970-01-01 │
└────────┴───────┴────────────┘
2 rows in set. Elapsed: 0.256 sec.
时间上还有问题 暂时先这样 pulsar sink clickhouse 冒烟测试 OK
本作品采用《CC 协议》,转载必须注明作者和本文链接