Kafka 介绍安装及基本命令

一、Kafka 概述

1.1、定义

Kafka传统定义:Kafka是 一个分布式的基于发布/订阅模式的消息队列(Message
Queue),主要应用于大数据实时处理领域。
发布/订阅:消息的发布者不会将消息直接发送给特定的订阅者,而是将发布的消息
分为不同的类别,订阅者只接收感兴趣的消息。

Kafka最新定义:Kafka是一个开源的分布式事件流平台( Event Streaming
Platform),被数千家公司用于高性能数据管道、流分析、数据集成和关键任务应用。

1.2、消息队列

目前企业中比较常见的消息队列产品主要有Kafka、 ActiveMQ、RabbitMQ、
RocketMQ等。
在大数据场景主要采用Kafka 作为消息队列。在JavaEE开发中主要采用ActiveMQ、
RabbitMQ、RocketMQ。

1.2.1、传统消息队列应用场景

传统的消息队列的主要应用场景包括:缓存/消峰、解耦和异步通信。

1.2.2、消息队列的两种模式

1.点对点模式

消费者主动拉取数据消息收到后清除消息

Kafka 介绍及使用

2.发布订阅模式

可以有多个topic主题(浏览、点赞、收藏、评论等)
消费者消费数据之后,不删除数据
每个消费者相互独立,都可以消费到数据

Kafka 介绍及使用

1.3、Kafka 基础架构

  1. 为了方便拓展,并提高吞吐量,一个 topic 分为多个 partition
  2. 配合分区设计,提出消费者组的概念,组内每个消费者并行消费
  3. 为了提高可用性,为每个消费者增加若干个副本,类似 NameNode HA
  4. Zookeeper 记录谁是 Leader,Kafka2.8.0 以后可以配置不采用 ZK

Kafka 介绍及安装

  1. Producer:消息生产者,就是向 Kafka broker 发送消息的客户端。
  2. Consumer:消息消费者,向 Kafka broker 取消息的客户端。
  3. Consumer Group(CG):消费者组,由多个 consumer 组成。消费者组内每个消费者负责消费不同分区的数据,一个分区只能由一个组内消费者消费;消费者组内之间互不影响。所有的消费者都属于某个消费者组,即消费者组是逻辑上的一个订阅者。
  4. Broker:一台 Kafka 服务器就是一个 broker。一个集群由多个 broker 组成。一个 broker 可以容纳多个 topic。
  5. Topic:可以理解为一个队列,生产者和消费者面向的都是一个 topic。
  6. Partition:为了实现拓展性,一个非常大的 topic 可以分布到多个 broker(即服务器)上,一个 topic 可以分为多个 partition,每个 partition 是一个有序队列。
  7. Replica:副本。一个 topic 的每个分区都有若干个副本,一个 Leader 和若干个 Follower。
  8. Leader:每个分区多个副本的”主”,生产者发送数据的对象,以及消费者消费数据的对象都是 Leader。
  9. Follower:每个分区多个副本中的”从”,实时从 Leader 中同步数据,保持和 Leader 数据的同步。Leader 发生故障时,某个 Follower 会成为新的 Leader。

二、Kafka 集群部署

这里以单机伪集群部署,多服务器大同小异

2.1、下载

Kafka 官方下载地址

2.2、解压并修改目录名称

$ tar -zxvf kafka_2.13-3.2.3.tgz -C .

$ mv kafka_2.13-3.2.3 kafka

2.3、修改配置文件

$ cd kafka
# 创建 data 存放的目录
$ mkdir -p datas/data0 datas/data1 datas/data2
# 创建存放配置文件目录
$ mkdir -p etc/kafka0 etc/kafka1 etc/kafka2
# 拷贝配置文件
$ cp cp config/server.properties etc/kafka0
$ cp cp config/server.properties etc/kafka1
$ cp cp config/server.properties etc/kafka2
# 修改配置文件
$ vim etc/kafka2/server.properties

修改内容如下

kafka0/server.properties

# broker 的全局唯一编号,不能重复,只能是数字
broker.id=0
# kafka 的端口
listeners=PLAINTEXT://:9092
# kafka 运行日志(数据)存放路径,可以不提前创建,kafka会自动创建,可以配置多个,用逗号分开
log.dirs=/kafka/kafka/datas/data0
# /kafka 会在zookeeper根节点下创建/kafka节点,便于后期维护
zookeeper.connect=localhost:2181,localhost:2182,localhost:2183/kafka

kafka1/server.properties

broker.id=1
listeners=PLAINTEXT://:9093
log.dirs=/kafka/kafka/datas/data1
# /kafka 会在zookeeper根节点下创建/kafka节点,便于后期维护
zookeeper.connect=localhost:2181,localhost:2182,localhost:2183/kafka

kafka2/server.properties

broker.id=2
listeners=PLAINTEXT://:9094
log.dirs=/kafka/kafka/datas/data2
# /kafka 会在zookeeper根节点下创建/kafka节点,便于后期维护
zookeeper.connect=localhost:2181,localhost:2182,localhost:2183/kafka

2.4、编写启动停止脚本

#!/bin/bash

case $1 in
"start")
    for i in kafka0 kafka1 kafka2
    do
        echo ------ $i 启动 ------
        "/Users/hudu/Environment/kafka/kafka/bin/kafka-server-start.sh" -daemon /Users/hudu/Environment/kafka/kafka/etc/$i/server.properties
    done
;;
"stop")
    # for i in kafka0 kafka1 kafka2
    # do
        echo ------ kafka 停止 ------
        "/Users/hudu/Environment/kafka/kafka/bin/kafka-server-stop.sh"
    # done
;;
esac

2.5、启动和停止

$ chomd u+x kf.sh
$ kf.sh start
$ kf.sh stop

注意:停止 Kafka 集群时,一定要等 Kafka 所有节点进程全部停止后再停止 Zookeeper 集群。因为Zookeeper 集群当中记录着 Kafka 集群相关信息,Zookeeper 集群一旦 先停止,Kafka 集群就没有办法再获取停止进程的信息,只能手动杀死 Kafka 进程了。

三、Kafka 命令行操作

3.1、Kafka 主题命令行操作

3.1.1、常用参数列表如下

参数 描述
--bootstrap-server <String: server to connect to> 连接的 Kafka Broker 主机名称和端口号
--topic <String:topic> 操作的 topic 名称
--create 创建主题
--delete 删除主题
--alter 修改主题
--list 查看所有主题
--describe 查看主题详细描述
--partitions <Integer: # of partitions> 设置分区数
--replication-factor <Integer:replication factor> 设置分区副本数
--config <String: name=value> 更新系统默认的配置

3.1.2、查看当前服务器中的所有 topic

$  kafka/bin/kafka-topics.sh --bootstrap-server localhost:9092 --list

3.1.3、创建 topic

$ kafka/bin/kafka-topics.sh --bootstrap-server localhost:9092 --topic first --create --partitio
ns 1 --replication-factor 3
Created topic first.

参数说明:

--topic:定义 topic 名
--replication-factor:定义副本数
--partitions:定义分区数

3.1.4、查看主题详情

$ kafka/bin/kafka-topics.sh --bootstrap-server localhost:9092 --topic first --describe
Topic: first    TopicId: nW9OWbmpSnGZE7MLxwK6Dw    PartitionCount: 1    ReplicationFactor: 3    Configs: segment.bytes=1073741824
    Topic: first    Partition: 0    Leader: 2    Replicas: 2,1,0    Isr: 2,1,0

3.1.5、修改分区数

注意:分区数只能增加,不能减少。并且副本数无法通过命令行修改。

$ kafka/bin/kafka-topics.sh --bootstrap-server localhost:9092 --topic first --alter --partitions 3

$ kafka/bin/kafka-topics.sh --bootstrap-server localhost:9092 --topic first --describe
Topic: first    TopicId: nW9OWbmpSnGZE7MLxwK6Dw    PartitionCount: 3    ReplicationFactor: 3    Configs: segment.bytes=1073741824
    Topic: first    Partition: 0    Leader: 2    Replicas: 2,1,0    Isr: 2,1,0
    Topic: first    Partition: 1    Leader: 0    Replicas: 0,1,2    Isr: 0,1,2
    Topic: first    Partition: 2    Leader: 1    Replicas: 1,2,0    Isr: 1,2,0

3.1.6、删除 topic

$ kafka/bin/kafka-topics.sh --bootstrap-server localhost:9092 --delete --topic first

3.2、生产者命令行操作

参数 描述
--bootstrap-server <String: server to connect to> 连接 Kafka Broker 主机名称和端口号
--topic <String:topic> 操作的 topic 名称

发送消息

$ kafka/bin/kafka-console-producer.sh --bootstrap-server localhost:9092 --topic first
>hello
>test
>hello

3.3、消费者命令行操作

参数 描述
--bootstrap-server <String: server to connect to> 连接 Kafka Broker 主机名称和端口号
--topic <String:topic> 操作的 topic 名称
--from-beginning 从头开始消费
--group <String:consumer group id> 指定消费者组名称

消费 first 主题中的数据

$ kafka/bin/kafka-console-consumer.sh --bootstrap-server localhost:9092 --topic first

消费 first 主题中的数据(包括历史数据)

$ kafka/bin/kafka-console-consumer.sh --bootstrap-server localhost:9092 --topic first
本作品采用《CC 协议》,转载必须注明作者和本文链接
讨论数量: 0
(= ̄ω ̄=)··· 暂无内容!

讨论应以学习和精进为目的。请勿发布不友善或者负能量的内容,与人为善,比聪明更重要!
未填写
文章
247
粉丝
18
喜欢
217
收藏
62
排名:731
访问:9753
私信
所有博文
社区赞助商