分享
  1. 首页
  2. 文章

一招让Kafka达到最佳吞吐量

kevinwan · · 599 次点击 · · 开始浏览
这是一个创建于 的文章,其中的信息可能已经有所发展或是发生改变。

通过上一篇文章对 dq 生产者的分析,我们知道 dq 是基于 beanstalk 的封装。至于 生产者 我们在后续的文章继续分享,本篇文章先来分析一下 go-queue 中的 kq

kq 基于 kafka 封装,设计之初是为了使 kafka 的使用更人性化。那就来看看 kq 的使用。

上手使用

func main() {
 // 1. 初始化
 pusher := kq.NewPusher([]string{
 "127.0.0.1:19092",
 "127.0.0.1:19092",
 "127.0.0.1:19092",
 }, "kq")
 ticker := time.NewTicker(time.Millisecond)
 for round := 0; round < 3; round++ {
 select {
 case <-ticker.C:
 count := rand.Intn(100)
 m := message{
 Key: strconv.FormatInt(time.Now().UnixNano(), 10),
 Value: fmt.Sprintf("%d,%d", round, count),
 Payload: fmt.Sprintf("%d,%d", round, count),
 }
 body, err := json.Marshal(m)
 if err != nil {
 log.Fatal(err)
 }
 fmt.Println(string(body))
 // 2. 写入
 if err := pusher.Push(string(body)); err != nil {
 log.Fatal(err)
 }
 }
 }
}

kafka cluster 配置以及 topic 传入,你就得到一个操作 kafkapush operator

至于写入消息,简单的调用 pusher.Push(msg) 就行。是的,就这么简单!

当然,目前只支持单个 msg 写入。可能有人会疑惑,那就继续往下看,为什么只能一条一条写入?

初始化

一起看看 pusher 初始化哪些步骤:

NewPusher(clusterAddrs, topic, opts...)
 |- kafka.NewWriter(kfConfig) // 与 kf 之前的连接
 |- executor = executors.NewChunkExecutor() // 设置内部写入的executor为字节数定量写入
  1. 建立与 kafka cluster 的连接。此处肯定就要传入 kafka config;
  2. 设置内部暂存区的写入函数以及刷新规则。

使用 chunkExecutor 作用不言而喻:将随机写 -> 批量写,减少 I/O 消耗;同时保证单次写入不能超过默认的 1M 或者自己设定的最大写入字节数。

其实再往 chunkExecutor 内部看,其实每次触发插入有两个指标:

  • maxChunkSize:单次最大写入字节数
  • flushInterval:刷新暂存消息插入的间隔时间

在触发写入,只要满足任意一个指标都会执行写入。同时在 executors 都有设置插入间隔时间,以防暂存区写入阻塞而暂存区内消息一直不被刷新清空。

更多关于 executors 可以参看以下:https://zeromicro.github.io/g...

生产者插入

根据上述初始化对 executors 介绍,插入过程中也少不了它的配合:

func (p *Pusher) Push(v string) error {
 // 1. 将 msg -> kafka 内部的 Message
 msg := kafka.Message{
 Key: []byte(strconv.FormatInt(time.Now().UnixNano(), 10)),
 Value: []byte(v),
 }
 
 // 使用 executor.Add() 插入内部的 container
 // 当 executor 初始化失败或者是内部发生错误,也会将 Message 直接插入 kafka
 if p.executor != nil {
 return p.executor.Add(msg, len(v))
 } else {
 return p.produer.WriteMessages(context.Background(), msg)
 }
}

过程其实很简单。那 executors.Add(msg, len(msg)) 是怎么把 msg 插入到 kafka 呢?

插入的逻辑其实在初始化中就声明了:

pusher.executor = executors.NewChunkExecutor(func(tasks []interface{}) {
 chunk := make([]kafka.Message, len(tasks))
 // 1
 for i := range tasks {
 chunk[i] = tasks[i].(kafka.Message)
 }
 // 2
 if err := pusher.produer.WriteMessages(context.Background(), chunk...); err != nil {
 logx.Error(err)
 }
 }, newOptions(opts)...)
  1. 触发插入时,将暂存区中存储的 []msg 依次拿出,作为最终插入消息集合;
  2. 将上一步的消息集合,作为一个批次插入 kafkatopic

这样 pusher -> chunkExecutor -> kafka 一个链路就出现了。下面用一张图形象表达一下:

image.png

框架地址

https://github.com/tal-tech/go-queue

同时在 go-queue 也大量使用 go-zero 的 批量处理工具库 executors

https://github.com/tal-tech/go-zero

欢迎使用 go-zero & go-queuestar 支持我们!一起构建 go-zero 生态!👍

go-zero 系列文章见『微服务实践』公众号

有疑问加站长微信联系(非本文作者)

本文来自:Segmentfault

感谢作者:kevinwan

查看原文:一招让Kafka达到最佳吞吐量

入群交流(和以上内容无关):加入Go大咖交流群,或添加微信:liuxiaoyan-s 备注:入群;或加QQ群:692541889

关注微信
599 次点击
暂无回复
添加一条新回复 (您需要 后才能回复 没有账号 ?)
  • 请尽量让自己的回复能够对别人有帮助
  • 支持 Markdown 格式, **粗体**、~~删除线~~、`单行代码`
  • 支持 @ 本站用户;支持表情(输入 : 提示),见 Emoji cheat sheet
  • 图片支持拖拽、截图粘贴等方式上传

用户登录

没有账号?注册
(追記) (追記ここまで)

今日阅读排行

    加载中
(追記) (追記ここまで)

一周阅读排行

    加载中

关注我

  • 扫码关注领全套学习资料 关注微信公众号
  • 加入 QQ 群:
    • 192706294(已满)
    • 731990104(已满)
    • 798786647(已满)
    • 729884609(已满)
    • 977810755(已满)
    • 815126783(已满)
    • 812540095(已满)
    • 1006366459(已满)
    • 692541889

  • 关注微信公众号
  • 加入微信群:liuxiaoyan-s,备注入群
  • 也欢迎加入知识星球 Go粉丝们(免费)

给该专栏投稿 写篇新文章

每篇文章有总共有 5 次投稿机会

收入到我管理的专栏 新建专栏