本文讲一下如何在 go-zero 连接 kafka 并引入若干个消费者。
我们先来看一下 go-zero 中的 kafka 配置项:
github.com\zeromicro\go-queue@v1.2.2\kq\config.go
package kq
import "github.com/zeromicro/go-zero/core/service"
const (
firstOffset = "first"
lastOffset = "last"
)
type KqConf struct {
service.ServiceConf
Brokers []string
Group string
Topic string
CaFile string `json:",optional"`
Offset string `json:",options=first|last,default=last"`
Conns int `json:",default=1"`
Consumers int `json:",default=8"`
Processors int `json:",default=8"`
MinBytes int `json:",default=10240"` // 10K
MaxBytes int `json:",default=10485760"` // 10M
Username string `json:",optional"`
Password string `json:",optional"`
ForceCommit bool `json:",default=true"`
CommitInOrder bool `json:",default=false"`
}
在配置文件中填写对应的配置:
mq/etc/task.yaml
Name: task.mq
ListenOn: 0.0.0.0:10091
MsgChatTransfer:
Name: MsgChatTransfer
Brokers:
- x.x.x.x:9092
Group: kafka
Topic: test
Consumers: 1
定义好需要用到的配置结构:
mq/internal/config/config.go
package config
import (
"github.com/zeromicro/go-queue/kq"
"github.com/zeromicro/go-zero/core/service"
)
type Config struct {
service.ServiceConf
ListenOn string
MsgChatTransfer kq.KqConf
}
在入口中通过 go-zero 的配置组件读取配置:
mq/task.go
package main
import (
"flag"
"fmt"
"github.com/zeromicro/go-zero/core/conf"
"github.com/zeromicro/go-zero/core/service"
)
var configFile = flag.String("f", "etc/task.yaml", "the config file")
func main() {
flag.Parse()
var c config.Config
conf.MustLoad(*configFile, &c)
if err := c.SetUp(); err != nil {
panic(err)
}
}
定义一下服务上下文:
mq/internal/svc/svc.go
type ServiceContext struct {
config.Config
}
func NewServiceContext(c config.Config) *ServiceContext {
return &ServiceContext{
Config: c,
}
}
到这里,我们就可以开始实现 kafka 的消费者了。我们找到 go-zero 中 ConsumeHandler 的定义:
ConsumeHandler interface {
Consume(ctx context.Context, key, value string) error
}
可以看到,要编写消费者,只要实现 Consume(ctx context.Context, key, value string) error 这个方法即可。我们编写一个测试消费者,功能是将 kv 打印:
mq/internal/handler/test.go
package test
type TestConsumer struct {
logx.Logger
svcCtx *svc.ServiceContext
}
func NewTestConsumer(svcCtx *svc.ServiceContext) *TestConsumer {
return &TestConsumer{
Logger: logx.WithContext(context.Background()),
svcCtx: svcCtx,
}
}
func (m *TestConsumer) Consume(ctx context.Context, key, val string) error {
fmt.Println("key: ", key)
fmt.Println("val: ", val)
return nil
}
怎么启动这个消费者呢?可以使用 go-zero 中的这个方法:
func kq.MustNewQueue(c kq.KqConf, handler kq.ConsumeHandler, opts ...kq.QueueOption) queue.MessageQueue
我们在入口中启动 TestConsumer:
mq/task.go
package main
import (
"flag"
"fmt"
"github.com/zeromicro/go-zero/core/conf"
"github.com/zeromicro/go-zero/core/service"
)
var configFile = flag.String("f", "etc/task.yaml", "the config file")
func main() {
flag.Parse()
var c config.Config
conf.MustLoad(*configFile, &c)
if err := c.SetUp(); err != nil {
panic(err)
}
svcCtx:=svc.NewServiceContext(c)
q:=kq.MustNewQueue(c.kqConf, handler.NewTestConsumer(svcCtx))
q.Start()
defer q.Stop()
}
随后进行测试,向 kafka 中发送消息,看有无输出即可。这种方法固然是已经实现了需求,但是还有一个较大的问题,就是假如 ConsumerHandler 很多的情况下,需要一个个地 Start ,非常麻烦。为了更方便使用,我们可以借助 go-zero 中的 ServcieGroup。
在 go-zero 中,只要实现了 Start 和 Stop,就可以视为实现了 Service 接口。因此我们可以编写一个 listen 用于管理所有消费者:
mq/internal/handler/listen.go
package handler
import (
"github.com/zeromicro/go-queue/kq"
"github.com/zeromicro/go-zero/core/service"
)
type Listen struct {
svcCtx *svc.ServiceContext
}
func NewListen(svcCtx *svc.ServiceContext) *Listen {
return &Listen{
svcCtx: svcCtx,
}
}
func (l *Listen) Services() []service.Service {
return []service.Service{
kq.MustNewQueue(l.svcCtx.Config.MsgChatTransfer, msgTransfer.NewMsgChatTransfer(l.svcCtx)),
}
}
在入口中,调用 Services 方法,获取所有服务,并同时启动和停止:
mq/task.go
package main
import (
"flag"
"fmt"
"github.com/zeromicro/go-zero/core/conf"
"github.com/zeromicro/go-zero/core/service"
)
var configFile = flag.String("f", "etc/task.yaml", "the config file")
func main() {
flag.Parse()
var c config.Config
conf.MustLoad(*configFile, &c)
if err := c.SetUp(); err != nil {
panic(err)
}
ctx := svc.NewServiceContext(c)
listen := handler.NewListen(ctx)
serviceGroup := service.NewServiceGroup()
for _, mq := range listen.Services() {
serviceGroup.Add(mq)
}
fmt.Println("Starting mq server at ", c.ListenOn)
serviceGroup.Start()
defer serviceGroup.Stop()
}
如果之后有新的 ConsumeHandler,只需要添加到 Services 中就可以了。