用 gpss 从 kafka 消费数据加载到 greenplum
准备kafka生产者和消费者
kafka生产者程序:
kafka-console-producer.sh --broker-list 192.168.12.115:7776 --topic gpss_test
kafka消费者程序:
kafka-console-consumer.sh --bootstrap-server 192.168.12.115:7776 --topic gpss_test --from-beginning
介绍
从kafka同步数据到greenplum有两种方式:
- 用gpss启动服务,用gpsscli向gpss注册kafka加载作业(重点介绍)
- 用gpkafka组件来快速完成上面的步骤,因为gpkafka封装了gpss和gpsscli的功能
准备一个配置文件用于配置gpss服务的host和port
gpss4ic.json
{ "ListenAddress": { "Host": "", "Port": 50007 }, "Gpfdist": { "Host": "", "Port": 8319, "ReuseTables": false } }
用于加载kafka数据到greenplum的配置文件
- 加载以”|”分割的流数据的配置文件 kafka_testdata_delimited.yaml
DATABASE: yloms USER: gpss_usr PASSWORD: gpss_usr HOST: mdw PORT: 5432 VERSION: 2 KAFKA: INPUT: SOURCE: BROKERS: 192.168.12.115:7776 TOPIC: gpss_test VALUE: COLUMNS: - NAME: tid TYPE: integer - NAME: tcode TYPE: varchar - NAME: tname TYPE: varchar FORMAT: delimited DELIMITED_OPTION: DELIMITER: "|" ERROR_LIMIT: 25 OUTPUT: SCHEMA: ylorder TABLE: test_heap METADATA: SCHEMA: ylorder COMMIT: MINIMAL_INTERVAL: 2000 POLL: BATCHSIZE: 100 TIMEOUT: 3000
- 加载JSON格式流数据的配置文件kafka_testdata_json.yaml
DATABASE: yloms USER: gpss_usr PASSWORD: gpss_usr HOST: mdw PORT: 5432 VERSION: 2 KAFKA: INPUT: SOURCE: BROKERS: 192.168.12.115:7776 TOPIC: gpss_test VALUE: COLUMNS: - NAME: jdata TYPE: json FORMAT: json ERROR_LIMIT: 25 OUTPUT: SCHEMA: ylorder TABLE: test_heap MAPPING: - NAME: tid EXPRESSION: (jdata->>'tid')::int - NAME: tcode EXPRESSION: (jdata->>'tcode')::varchar - NAME: tname EXPRESSION: (jdata->>'tname')::varchar METADATA: SCHEMA: ylorder COMMIT: MINIMAL_INTERVAL: 2000 POLL: BATCHSIZE: 100 TIMEOUT: 3000
用gpss做etl加载:
启动gpss服务:
命令格式
gpss gpss4ic.json
日志输出
20200225:22:08:21 gpss:gpadmin:greenplum-001:010656-[INFO]:-using config file: gpss4ic.json 20200225:22:08:21 gpss:gpadmin:greenplum-001:010656-[INFO]:-config file content: { "ListenAddress": { "Host": "mdw", "Port": 50007, "Certificate": { "CertFile": "", "KeyFile": "", "CAFile": "" } }, "Gpfdist": { "Host": "mdw", "Port": 8319, "ReuseTables": false, "Certificate": { "CertFile": "", "KeyFile": "", "CAFile": "" }, "BindAddress": "0.0.0.0" } } 20200225:22:08:21 gpss:gpadmin:greenplum-001:010656-[INFO]:-gpss-listen-address-prefix: mdw:50007 20200225:22:08:21 gpss:gpadmin:greenplum-001:010656-[INFO]:-gpss will use random external table name, external table won't get reused 20200225:22:08:21 gpss:gpadmin:greenplum-001:010656-[INFO]:-gpfdist listening on 0.0.0.0:8319 20200225:22:08:21 gpss:gpadmin:greenplum-001:010656-[INFO]:-gpss listening on mdw:50007
提交一个作业:
命令格式
gpsscli submit --name kafkajson2gp --gpss-port 50007 --gpss-host mdw ./kafka_testdata_json.yaml
输出如下
20200225:22:09:16 gpsscli:gpadmin:greenplum-001:010722-[INFO]:-JobID: kafkajson2gp
查看作业列表:
命令格式
gpsscli list --all --gpss-port 50007 --gpss-host mdw
输出如下
JobID GPHost GPPort DataBase Schema Table Topic Status kafkajson2gp mdw 5432 yloms ylorder test_heap gpss_test JOB_STOPPED
启动作业:
命令格式
gpsscli start kafkajson2gp --gpss-port 50007 --gpss-host mdw
输出如下
20200225:22:10:24 gpsscli:gpadmin:greenplum-001:010756-[INFO]:-JobID: kafkajson2gp is started
再次查看作业:
命令格式
gpsscli list --all --gpss-port 50007 --gpss-host mdw
输出如下
JobID GPHost GPPort DataBase Schema Table Topic Status kafkajson2gp mdw 5432 yloms ylorder test_heap gpss_test JOB_RUNNING
停掉作业:
命令格式
gpsscli stop kafkajson2gp --gpss-port 50007 --gpss-host mdw
输出如下
20200225:22:11:04 gpsscli:gpadmin:greenplum-001:010801-[INFO]:-Stop a job: kafkajson2gp, status JOB_STOPPED
用gpkafka load启动服务:
注意:gpkafka load 可以理解为代替了gpsscli上的提交作业,启动作业等命令。
命令格式gpkafka --config gpss4ic.json load kafka_testdata_json.yaml
参考文档:
Loading Kafka Data into Greenplum
yanivbhemo / greenplum-gpss
本作品采用《CC 协议》,转载必须注明作者和本文链接
gpscli 或者gpkafka我怎么都找不到,我用的安装包是:greenplum-db-6.8.0-rhel7-x86_64.rpm,使用docker-compose安装的。求解