Skip to content

メッセージ送受信サンプルコード

Apache Kafkaのメッセージの送信・受信を行うgolangのサンプルコードです。

cf. https://github.com/segmentio/kafka-go

事前準備

  • トピックquickstart-eventsを作成してください:
$ bin/kafka-topics.sh --create --topic quickstart-events --bootstrap-server localhost:9092
Created topic quickstart-events.

$ bin/kafka-topics.sh --describe --topic quickstart-events --bootstrap-server localhost:9092
Topic: quickstart-events        TopicId: mvQtvjNfRgWhufRT9kQv0w PartitionCount: 1       ReplicationFactor: 1    Configs: min.insync.replicas=1,segment.bytes=1073741824
        Topic: quickstart-events        Partition: 0    Leader: 1       Replicas: 1     Isr: 1  Elr:    LastKnownElr:
  • 下記で必要なパッケージをインストールしてください:
shell
go get github.com/segmentio/kafka-go

送信

go
package main

import (
	"context"
	"log"

	"github.com/segmentio/kafka-go"
)

func main() {
	w := kafka.NewWriter(kafka.WriterConfig{
		Brokers: []string{"localhost:9092"},
		Topic:   "quickstart-events",
	})
	defer w.Close()

	body := "Hello World!"
	err := w.WriteMessages(context.Background(),
		kafka.Message{
			Key:   []byte("hello"),
			Value: []byte(body),
		},
	)

	if err != nil {
		panic(err)
	}

	log.Printf(" Sent %s", body)
}

実行すると、下記のようなメッセージが表示されます:

$ go run main.go
2026/01/11 14:35:32  Sent Hello World!

受信

go
package main

import (
	"context"
	"log"

	"github.com/segmentio/kafka-go"
)

func main() {
	r := kafka.NewReader(kafka.ReaderConfig{
		Brokers: []string{"localhost:9092"},
		Topic:   "quickstart-events",
	})
	defer r.Close()

	for {
		msg, err := r.ReadMessage(context.Background())
		if err != nil {
			panic(err)
		}

		log.Printf(" Received a message: %s\n", string(msg.Value))
	}
}

メッセージがpushされると下記のようなメッセージが出力されます:

2026/01/11 14:46:00  Received a message: Hello World!