Skip to content

メッセージ送受信サンプルコード

Apache Kafkaのメッセージの送信・受信を行うgolangのサンプルコードです。

cf. https://github.com/segmentio/kafka-go

事前準備

  • トピックquickstart-eventsを作成してください:
$ bin/kafka-topics.sh --create --topic quickstart-events --bootstrap-server localhost:9092
Created topic quickstart-events.

$ bin/kafka-topics.sh --describe --topic quickstart-events --bootstrap-server localhost:9092
Topic: quickstart-events        TopicId: mvQtvjNfRgWhufRT9kQv0w PartitionCount: 1       ReplicationFactor: 1    Configs: min.insync.replicas=1,segment.bytes=1073741824
        Topic: quickstart-events        Partition: 0    Leader: 1       Replicas: 1     Isr: 1  Elr:    LastKnownElr:
  • 下記で必要なパッケージをインストールしてください:
go get github.com/segmentio/kafka-go

送信

package main

import (
    "context"
    "log"

    "github.com/segmentio/kafka-go"
)

func main() {
    w := kafka.NewWriter(kafka.WriterConfig{
        Brokers: []string{"localhost:9092"},
        Topic:   "quickstart-events",
    })
    defer w.Close()

    body := "Hello World!"
    err := w.WriteMessages(context.Background(),
        kafka.Message{
            Key:   []byte("hello"),
            Value: []byte(body),
        },
    )

    if err != nil {
        panic(err)
    }

    log.Printf(" Sent %s", body)
}

実行すると、下記のようなメッセージが表示されます:

$ go run main.go
2026/01/11 14:35:32  Sent Hello World!

受信

package main

import (
    "context"
    "log"

    "github.com/segmentio/kafka-go"
)

func main() {
    r := kafka.NewReader(kafka.ReaderConfig{
        Brokers: []string{"localhost:9092"},
        Topic:   "quickstart-events",
    })
    defer r.Close()

    for {
        msg, err := r.ReadMessage(context.Background())
        if err != nil {
            panic(err)
        }

        log.Printf(" Received a message: %s\n", string(msg.Value))
    }
}

メッセージがpushされると下記のようなメッセージが出力されます:

2026/01/11 14:46:00  Received a message: Hello World!