メッセージ送受信サンプルコード
Apache Kafkaのメッセージの送信・受信を行うgolangのサンプルコードです。
cf. https://github.com/segmentio/kafka-go
事前準備
- トピック
quickstart-eventsを作成してください:
$ bin/kafka-topics.sh --create --topic quickstart-events --bootstrap-server localhost:9092
Created topic quickstart-events.
$ bin/kafka-topics.sh --describe --topic quickstart-events --bootstrap-server localhost:9092
Topic: quickstart-events TopicId: mvQtvjNfRgWhufRT9kQv0w PartitionCount: 1 ReplicationFactor: 1 Configs: min.insync.replicas=1,segment.bytes=1073741824
Topic: quickstart-events Partition: 0 Leader: 1 Replicas: 1 Isr: 1 Elr: LastKnownElr:
- 下記で必要なパッケージをインストールしてください:
go get github.com/segmentio/kafka-go
送信
package main
import (
"context"
"log"
"github.com/segmentio/kafka-go"
)
func main() {
w := kafka.NewWriter(kafka.WriterConfig{
Brokers: []string{"localhost:9092"},
Topic: "quickstart-events",
})
defer w.Close()
body := "Hello World!"
err := w.WriteMessages(context.Background(),
kafka.Message{
Key: []byte("hello"),
Value: []byte(body),
},
)
if err != nil {
panic(err)
}
log.Printf(" Sent %s", body)
}
実行すると、下記のようなメッセージが表示されます:
$ go run main.go
2026/01/11 14:35:32 Sent Hello World!
受信
package main
import (
"context"
"log"
"github.com/segmentio/kafka-go"
)
func main() {
r := kafka.NewReader(kafka.ReaderConfig{
Brokers: []string{"localhost:9092"},
Topic: "quickstart-events",
})
defer r.Close()
for {
msg, err := r.ReadMessage(context.Background())
if err != nil {
panic(err)
}
log.Printf(" Received a message: %s\n", string(msg.Value))
}
}
メッセージがpushされると下記のようなメッセージが出力されます:
2026/01/11 14:46:00 Received a message: Hello World!