pulsar sink clickhouse

pulsar sink clickhouse

  • 目标 pulsar topic 数据 sink 到clickhouse

创建下沉数据库

create table pulsar_clickhouse_jdbc_sink
(
    userId UInt64,
    score UInt8,
    EventDate Date
)
engine = MergeTree()
    PARTITION BY toYYYYMM(EventDate)
    ORDER BY (EventDate,intHash32(userId))
        SAMPLE BY intHash32(userId);

创建 clickhouse sink config 配置文件

configs:
        userName: "default"
        password: ""
        jdbcUrl: "jdbc:clickhouse://localhost:8123/pulsar_clickhouse_jdbc_sink"
        tableName: "pulsar_clickhouse_jdbc_sink"

在 pulsar 上创建 sink connector

本地启动 clickhouse sink

./pulsar-admin sinks localrun  \
--archive /root/apache-pulsar-2.7.0/connectors/pulsar-io-jdbc-clickhouse-2.7.0.nar   \
--tenant public  \
--namespace default \
--name sink-test-clickhouse  \
--inputs test_clickhouse  \
--sink-config-file  /root/apache-pulsar-2.7.0/bin/clickhouse.yaml 

如果找不到 sink type name需要去 archive.apache.org/dist/pulsar/pul... 下载相应的 connector 安装到 pulsar 根目录下的 connectors 目录下面

开启Schema 自动注册

定义pojo

./pulsar-admin namespaces  set-is-allow-auto-update-schema --enable public/default

pulsar 发布消息到topic

package com.example.demo.utils;

import lombok.Data;

import java.io.Serializable;
import java.util.Date ;

@Data
public class UserEvent implements Serializable {
    Long userId;
    Integer score;
    Date EventDate;
}

发送数据到 topic 查看是否sink到 clickhouse

  PulsarClient client = PulsarClient.builder()
                .serviceUrl("pulsar://localhost:6650")
                .build();
        var ch= JSONSchema.of(UserEvent.class);
        System.out.println("定义");
        System.out.println(ch.getSchemaInfo().getSchemaDefinition());
        Producer<UserEvent> producer = client.newProducer(ch)
                .enableBatching(true)
                .topic("test_clickhouse")
                .create();

        UserEvent ue=new UserEvent();
        ue.setUserId(1L);
        ue.setScore(20);
        ue.setEventDate(Calendar.getInstance().getTime());
        producer.send(ue);
select * from pulsar_clickhouse_jdbc_sink.pulsar_clickhouse_jdbc_sink;
SELECT *
FROM pulsar_clickhouse_jdbc_sink.pulsar_clickhouse_jdbc_sink

Query id: 635f4ae8-00c0-401c-914d-ab47ccc5a718

┌─userId─┬─score─┬──EventDate─┐
│      1301970-01-01 │
└────────┴───────┴────────────┘
┌─userId─┬─score─┬──EventDate─┐
│      1201970-01-01 │
└────────┴───────┴────────────┘

2 rows in set. Elapsed: 0.256 sec.

时间上还有问题 暂时先这样 pulsar sink clickhouse 冒烟测试 OK

本作品采用《CC 协议》,转载必须注明作者和本文链接
讨论数量: 0
(= ̄ω ̄=)··· 暂无内容!

讨论应以学习和精进为目的。请勿发布不友善或者负能量的内容,与人为善,比聪明更重要!
未填写
文章
1
粉丝
1
喜欢
0
收藏
0
排名:3462
访问:44
私信
所有博文
社区赞助商