用 gpss 从 kafka 消费数据加载到 greenplum

准备kafka生产者和消费者

kafka生产者程序:

kafka-console-producer.sh --broker-list 192.168.12.115:7776 --topic gpss_test

kafka消费者程序:

kafka-console-consumer.sh --bootstrap-server 192.168.12.115:7776 --topic gpss_test --from-beginning

介绍

从kafka同步数据到greenplum有两种方式:

  1. 用gpss启动服务,用gpsscli向gpss注册kafka加载作业(重点介绍)
  2. 用gpkafka组件来快速完成上面的步骤,因为gpkafka封装了gpss和gpsscli的功能
    用gpss从kafka消费数据加载到greenplum

准备一个配置文件用于配置gpss服务的host和port

gpss4ic.json

{
    "ListenAddress": {
        "Host": "",
        "Port": 50007
    },
    "Gpfdist": {
        "Host": "",
        "Port": 8319,
        "ReuseTables": false
    }
}

用于加载kafka数据到greenplum的配置文件

  1. 加载以”|”分割的流数据的配置文件 kafka_testdata_delimited.yaml
    DATABASE: yloms
    USER: gpss_usr
    PASSWORD: gpss_usr
    HOST: mdw
    PORT: 5432
    VERSION: 2
    KAFKA:
    INPUT:
       SOURCE:
          BROKERS: 192.168.12.115:7776
          TOPIC: gpss_test
       VALUE:
          COLUMNS:
            - NAME: tid
              TYPE: integer
            - NAME: tcode
              TYPE: varchar
            - NAME: tname
              TYPE: varchar
          FORMAT: delimited
          DELIMITED_OPTION:
            DELIMITER: "|"
       ERROR_LIMIT: 25
    OUTPUT:
       SCHEMA: ylorder
       TABLE: test_heap
    METADATA:
       SCHEMA: ylorder
    COMMIT:
       MINIMAL_INTERVAL: 2000
    POLL:
       BATCHSIZE: 100
       TIMEOUT: 3000
  2. 加载JSON格式流数据的配置文件kafka_testdata_json.yaml
    DATABASE: yloms
    USER: gpss_usr
    PASSWORD: gpss_usr
    HOST: mdw
    PORT: 5432
    VERSION: 2
    KAFKA:
    INPUT:
       SOURCE:
          BROKERS: 192.168.12.115:7776
          TOPIC: gpss_test
       VALUE:
         COLUMNS:
            - NAME: jdata
              TYPE: json
         FORMAT: json
       ERROR_LIMIT: 25
    OUTPUT:
       SCHEMA: ylorder
       TABLE: test_heap
       MAPPING:
         - NAME: tid
           EXPRESSION: (jdata->>'tid')::int
         - NAME: tcode
           EXPRESSION: (jdata->>'tcode')::varchar
         - NAME: tname
           EXPRESSION: (jdata->>'tname')::varchar
    METADATA:
       SCHEMA: ylorder
    COMMIT:
       MINIMAL_INTERVAL: 2000
    POLL:
       BATCHSIZE: 100
       TIMEOUT: 3000

用gpss做etl加载:

启动gpss服务:

命令格式

gpss gpss4ic.json

日志输出

20200225:22:08:21 gpss:gpadmin:greenplum-001:010656-[INFO]:-using config file: gpss4ic.json
20200225:22:08:21 gpss:gpadmin:greenplum-001:010656-[INFO]:-config file content: {
        "ListenAddress": {
                "Host": "mdw",
                "Port": 50007,
                "Certificate": {
                        "CertFile": "",
                        "KeyFile": "",
                        "CAFile": ""
                }
        },
        "Gpfdist": {
                "Host": "mdw",
                "Port": 8319,
                "ReuseTables": false,
                "Certificate": {
                        "CertFile": "",
                        "KeyFile": "",
                        "CAFile": ""
                },
                "BindAddress": "0.0.0.0"
        }
}
20200225:22:08:21 gpss:gpadmin:greenplum-001:010656-[INFO]:-gpss-listen-address-prefix: mdw:50007
20200225:22:08:21 gpss:gpadmin:greenplum-001:010656-[INFO]:-gpss will use random external table name, external table won't get reused
20200225:22:08:21 gpss:gpadmin:greenplum-001:010656-[INFO]:-gpfdist listening on 0.0.0.0:8319
20200225:22:08:21 gpss:gpadmin:greenplum-001:010656-[INFO]:-gpss listening on mdw:50007

提交一个作业:

命令格式

gpsscli submit --name kafkajson2gp --gpss-port 50007 --gpss-host mdw ./kafka_testdata_json.yaml

输出如下

20200225:22:09:16 gpsscli:gpadmin:greenplum-001:010722-[INFO]:-JobID: kafkajson2gp

查看作业列表:

命令格式

gpsscli list --all --gpss-port 50007 --gpss-host mdw

输出如下

JobID                               GPHost          GPPort  DataBase        Schema          Table                           Topic           Status
kafkajson2gp                        mdw             5432    yloms           ylorder         test_heap                       gpss_test       JOB_STOPPED

启动作业:

命令格式

gpsscli start kafkajson2gp --gpss-port 50007 --gpss-host mdw

输出如下

20200225:22:10:24 gpsscli:gpadmin:greenplum-001:010756-[INFO]:-JobID: kafkajson2gp is started

再次查看作业:

命令格式

gpsscli list --all --gpss-port 50007 --gpss-host mdw

输出如下

JobID                               GPHost          GPPort  DataBase        Schema          Table                           Topic           Status
kafkajson2gp                        mdw             5432    yloms           ylorder         test_heap                       gpss_test       JOB_RUNNING

停掉作业:

命令格式

gpsscli stop kafkajson2gp --gpss-port 50007 --gpss-host mdw

输出如下

20200225:22:11:04 gpsscli:gpadmin:greenplum-001:010801-[INFO]:-Stop a job: kafkajson2gp, status JOB_STOPPED

用gpkafka load启动服务:

注意:gpkafka load 可以理解为代替了gpsscli上的提交作业,启动作业等命令。
命令格式

gpkafka --config gpss4ic.json load kafka_testdata_json.yaml

参考文档:
Loading Kafka Data into Greenplum
yanivbhemo / greenplum-gpss

本作品采用《CC 协议》,转载必须注明作者和本文链接
讨论数量: 1

gpscli 或者gpkafka我怎么都找不到,我用的安装包是:greenplum-db-6.8.0-rhel7-x86_64.rpm,使用docker-compose安装的。求解

3年前 评论
18015290123 (作者) 3年前

讨论应以学习和精进为目的。请勿发布不友善或者负能量的内容,与人为善,比聪明更重要!