kafka(docker) 入门分享

kafka 入门分享

一开始自己docker 跑起来一个kafka 但是踩了很多坑 容器跑不起来 后来又容器内无法消费 本地连接失败等
后来发现镜像依赖的不一样 最后用的bitnami/zookeeper 和 bitnami/kafka

上代码

docker-compose.yml

version: '3'
services:
  zookeeper:
    image: bitnami/zookeeper:latest
    ports:
      - "2181:2181"
    environment:
      - ALLOW_ANONYMOUS_LOGIN=yes
  kafka:
    image: bitnami/kafka:latest
    depends_on: [ zookeeper ]
    ports:
      - "9092:9092"
      - "9093:9093"
    environment:
      - KAFKA_ADVERTISED_HOST_NAME=kafka
      - KAFKA_CREATE_TOPICS="test:1:1"
      - KAFKA_ZOOKEEPER_CONNECT=zookeeper:2181
      - ALLOW_PLAINTEXT_LISTENER=yes
      - KAFKA_CFG_LISTENER_SECURITY_PROTOCOL_MAP=CLIENT:PLAINTEXT,EXTERNAL:PLAINTEXT
      - KAFKA_CFG_LISTENERS=CLIENT://:9092,EXTERNAL://:9093
      - KAFKA_CFG_ADVERTISED_LISTENERS=CLIENT://localhost:9092,EXTERNAL://localhost:9093
      - KAFKA_INTER_BROKER_LISTENER_NAME=CLIENT


    volumes:
      - /data/product/zj_bigdata/data/kafka/docker.sock:/var/run/docker.sock
  kafka-manager:
    image: sheepkiller/kafka-manager
    depends_on: [ zookeeper ]
    ports:
      - "9190:9000"
    environment:
      ZK_HOSTS: zookeeper:2181

producer.go

package main

import (
    "context"
    "log"
    "time"

    "github.com/segmentio/kafka-go"
)

func main() {
    topic := "gotest"
    partition := 0

    conn, err := kafka.DialLeader(context.Background(), "tcp", "localhost:9093", topic, partition)
    if err != nil {
        log.Fatal("failed to dial leader:", err)
    }

    conn.SetWriteDeadline(time.Now().Add(10 * time.Second))
    _, err = conn.WriteMessages(
        kafka.Message{Value: []byte("one!")},
        kafka.Message{Value: []byte("two!")},
        kafka.Message{Value: []byte("three!")},
    )
    if err != nil {
        log.Fatal("failed to write messages:", err)
    }

    if err := conn.Close(); err != nil {
        log.Fatal("failed to close writer:", err)
    }
}

consumer.go

package main

import (
    "context"
    "fmt"
    "log"
    "time"

    "github.com/segmentio/kafka-go"
)

func main() {
    topic := "gotest"
    partition := 0

    conn, err := kafka.DialLeader(context.Background(), "tcp", "localhost:9093", topic, partition)
    if err != nil {
        log.Fatal("failed to dial leader:", err)
    }

    conn.SetReadDeadline(time.Now().Add(10 * time.Second))
    batch := conn.ReadBatch(10e3, 1e6) // fetch 10KB min, 1MB max

    b := make([]byte, 10e3) // 10KB max per message
    for {
        n, err := batch.Read(b)
        if err != nil {
            break
        }
        fmt.Println(string(b[:n]))
    }

    if err := batch.Close(); err != nil {
        log.Fatal("failed to close batch:", err)
    }

    if err := conn.Close(); err != nil {
        log.Fatal("failed to close connection:", err)
    }
}

参考连接

本作品采用《CC 协议》,转载必须注明作者和本文链接
讨论数量: 1

kafka 的 dockerfile 可以参考这个 kafka-docker

2年前 评论

讨论应以学习和精进为目的。请勿发布不友善或者负能量的内容,与人为善,比聪明更重要!