go-zero中的kafka组件

本文讲一下如何在 go-zero 连接 kafka 并引入若干个消费者。

我们先来看一下 go-zero 中的 kafka 配置项:

github.com\zeromicro\go-queue@v1.2.2\kq\config.go

package kq

import "github.com/zeromicro/go-zero/core/service"

const (
    firstOffset = "first"
    lastOffset  = "last"
)

type KqConf struct {
    service.ServiceConf
    Brokers       []string
    Group         string
    Topic         string
    CaFile        string `json:",optional"`
    Offset        string `json:",options=first|last,default=last"`
    Conns         int    `json:",default=1"`
    Consumers     int    `json:",default=8"`
    Processors    int    `json:",default=8"`
    MinBytes      int    `json:",default=10240"`    // 10K
    MaxBytes      int    `json:",default=10485760"` // 10M
    Username      string `json:",optional"`
    Password      string `json:",optional"`
    ForceCommit   bool   `json:",default=true"`
    CommitInOrder bool   `json:",default=false"`
}

在配置文件中填写对应的配置:

mq/etc/task.yaml

Name: task.mq
ListenOn: 0.0.0.0:10091
MsgChatTransfer:
  Name: MsgChatTransfer
  Brokers:
    - x.x.x.x:9092
  Group: kafka
  Topic: test
  Consumers: 1

定义好需要用到的配置结构:

mq/internal/config/config.go

package config

import (
    "github.com/zeromicro/go-queue/kq"
    "github.com/zeromicro/go-zero/core/service"
)

type Config struct {
    service.ServiceConf

    ListenOn string

    MsgChatTransfer kq.KqConf
}

在入口中通过 go-zero 的配置组件读取配置:

mq/task.go

package main

import (
    "flag"
    "fmt"

    "github.com/zeromicro/go-zero/core/conf"
    "github.com/zeromicro/go-zero/core/service"
)

var configFile = flag.String("f", "etc/task.yaml", "the config file")

func main() {
    flag.Parse()

    var c config.Config
    conf.MustLoad(*configFile, &c)

    if err := c.SetUp(); err != nil {
        panic(err)
    }
}

定义一下服务上下文:

mq/internal/svc/svc.go

type ServiceContext struct {
    config.Config
}

func NewServiceContext(c config.Config) *ServiceContext {
    return &ServiceContext{
        Config: c,
    }
}

到这里,我们就可以开始实现 kafka 的消费者了。我们找到 go-zero 中 ConsumeHandler 的定义:

ConsumeHandler interface {
    Consume(ctx context.Context, key, value string) error
}

可以看到,要编写消费者,只要实现 Consume(ctx context.Context, key, value string) error 这个方法即可。我们编写一个测试消费者,功能是将 kv 打印:

mq/internal/handler/test.go

package test

type TestConsumer struct {
    logx.Logger
    svcCtx *svc.ServiceContext
}

func NewTestConsumer(svcCtx *svc.ServiceContext) *TestConsumer {
    return &TestConsumer{
        Logger: logx.WithContext(context.Background()),
        svcCtx: svcCtx,
    }
}

func (m *TestConsumer) Consume(ctx context.Context, key, val string) error {
    fmt.Println("key: ", key)
    fmt.Println("val: ", val)
    return nil
}

怎么启动这个消费者呢?可以使用 go-zero 中的这个方法:

func kq.MustNewQueue(c kq.KqConf, handler kq.ConsumeHandler, opts ...kq.QueueOption) queue.MessageQueue

我们在入口中启动 TestConsumer:

mq/task.go

package main

import (
    "flag"
    "fmt"

    "github.com/zeromicro/go-zero/core/conf"
    "github.com/zeromicro/go-zero/core/service"
)

var configFile = flag.String("f", "etc/task.yaml", "the config file")

func main() {
    flag.Parse()

    var c config.Config
    conf.MustLoad(*configFile, &c)

    if err := c.SetUp(); err != nil {
        panic(err)
    }

    svcCtx:=svc.NewServiceContext(c)

    q:=kq.MustNewQueue(c.kqConf, handler.NewTestConsumer(svcCtx))

    q.Start()
    defer q.Stop()
}

随后进行测试,向 kafka 中发送消息,看有无输出即可。这种方法固然是已经实现了需求,但是还有一个较大的问题,就是假如 ConsumerHandler 很多的情况下,需要一个个地 Start ,非常麻烦。为了更方便使用,我们可以借助 go-zero 中的 ServcieGroup。

在 go-zero 中,只要实现了 Start 和 Stop,就可以视为实现了 Service 接口。因此我们可以编写一个 listen 用于管理所有消费者:

mq/internal/handler/listen.go

package handler

import (
    "github.com/zeromicro/go-queue/kq"
    "github.com/zeromicro/go-zero/core/service"
)

type Listen struct {
    svcCtx *svc.ServiceContext
}

func NewListen(svcCtx *svc.ServiceContext) *Listen {
    return &Listen{
        svcCtx: svcCtx,
    }
}

func (l *Listen) Services() []service.Service {
    return []service.Service{
        kq.MustNewQueue(l.svcCtx.Config.MsgChatTransfer, msgTransfer.NewMsgChatTransfer(l.svcCtx)),
    }
}

在入口中,调用 Services 方法,获取所有服务,并同时启动和停止:

mq/task.go

package main

import (
    "flag"
    "fmt"

    "github.com/zeromicro/go-zero/core/conf"
    "github.com/zeromicro/go-zero/core/service"
)

var configFile = flag.String("f", "etc/task.yaml", "the config file")

func main() {
    flag.Parse()

    var c config.Config
    conf.MustLoad(*configFile, &c)

    if err := c.SetUp(); err != nil {
        panic(err)
    }

    ctx := svc.NewServiceContext(c)

    listen := handler.NewListen(ctx)

    serviceGroup := service.NewServiceGroup()

    for _, mq := range listen.Services() {
        serviceGroup.Add(mq)
    }

    fmt.Println("Starting mq server at ", c.ListenOn)
    serviceGroup.Start()
    defer serviceGroup.Stop()
}

如果之后有新的 ConsumeHandler,只需要添加到 Services 中就可以了。

你刚刚浪费了人生中宝贵的几分钟。
暂无评论

发送评论 编辑评论


				
|´・ω・)ノ
ヾ(≧∇≦*)ゝ
(☆ω☆)
(╯‵□′)╯︵┴─┴
 ̄﹃ ̄
(/ω\)
∠( ᐛ 」∠)_
(๑•̀ㅁ•́ฅ)
→_→
୧(๑•̀⌄•́๑)૭
٩(ˊᗜˋ*)و
(ノ°ο°)ノ
(´இ皿இ`)
⌇●﹏●⌇
(ฅ´ω`ฅ)
(╯°A°)╯︵○○○
φ( ̄∇ ̄o)
ヾ(´・ ・`。)ノ"
( ง ᵒ̌皿ᵒ̌)ง⁼³₌₃
(ó﹏ò。)
Σ(っ °Д °;)っ
( ,,´・ω・)ノ"(´っω・`。)
╮(╯▽╰)╭
o(*////▽////*)q
>﹏<
( ๑´•ω•) "(ㆆᴗㆆ)
😂
😀
😅
😊
🙂
🙃
😌
😍
😘
😜
😝
😏
😒
🙄
😳
😡
😔
😫
😱
😭
💩
👻
🙌
🖕
👍
👫
👬
👭
🌚
🌝
🙈
💊
😶
🙏
🍦
🍉
😣
Source: github.com/k4yt3x/flowerhd
颜文字
Emoji
小恐龙
花!
上一篇