分享
  1. 首页
  2. 文章

golang异步kafka生产者

hansegod · · 6685 次点击 · · 开始浏览
这是一个创建于 的文章,其中的信息可能已经有所发展或是发生改变。

在实际业务场景中,为了提高系统的实时性,减轻日志存储压力,需要将日志直接生产至消息中间件,减少flume或flumted收集所导致的延时及性能压力,本文实现了一下功能:
实现了一个静态调用的异步生产者 AsyncProducer
封装了一个用于异步发送的生产器 Agent

//@description	kafka代理
//@author chenbintao
//@data	2017年09月27日	10:30	初稿
//		2017年09月27日	11:15	规范代码
//		2017年09月28日	14:15	对发送逻辑进行了优化
package kafkaAgent
import (
	"fmt"
	"log"
	"runtime/debug"
	"strings"
	"time"
	"github.com/Shopify/sarama"
)
const (
	_BROKER_LIST_ = `localhost:9092`
)
const (
	_LABEL_ = "[_kafkaAgent_]"
)
var (
	IS_DEBUG = false
	_PAUSE_ = false
)
func SetDebug(debug bool) {
	IS_DEBUG = debug
}
type Agent struct {
	flag bool
	BrokerList string
	TopicList string
	SendTimeOut time.Duration
	ReceiveTimeOut time.Duration
	AsyncProducer sarama.AsyncProducer
}
func (this *Agent) Set(BrokerList, TopicList string, SendTimeOut, ReceiveTimeOut time.Duration) bool {
	//只允许初始化一次
	if this.flag {
		return false
	}
	this.flag = true
	this.BrokerList = BrokerList
	this.TopicList = TopicList
	this.SendTimeOut = SendTimeOut
	this.ReceiveTimeOut = ReceiveTimeOut
	this.AsyncProducer = getProducer(this.BrokerList, this.SendTimeOut, true)
	if nil == this.AsyncProducer {
		return false
	}
	return this.Check()
}
func (this *Agent) Check() bool {
	if "" == this.BrokerList || "" == this.TopicList {
		return false
	}
	if 0 == this.SendTimeOut && 0 == this.ReceiveTimeOut {
		return false
	}
	return true
}
func (this *Agent) Send(msg string) bool {
	defer func() {
		if e, ok := recover().(error); ok {
			log.Println(_LABEL_, "WARN: panic in %v", e)
			log.Println(_LABEL_, string(debug.Stack()))
			this.AsyncProducer.Close()
			this.AsyncProducer = getProducer(this.BrokerList, this.SendTimeOut, true)
		}
	}()
	if !this.Check() {
		return false
	}
	return asyncProducer(
		this.AsyncProducer,
		this.TopicList,
		msg,
	)
}
//=========================================================================
// asyncProducer 异步生产者
func AsyncProducer(kafka_list, topics, s string, timeout time.Duration) bool {
	if "" == kafka_list || "" == topics {
		return false
	}
	producer := getProducer(kafka_list, timeout, false)
	if nil == producer {
		return false
	}
	defer producer.Close()
	go func(p sarama.AsyncProducer) {
		errors := p.Errors()
		success := p.Successes()
		for {
			select {
			case err := <-errors:
				if err != nil {
					if IS_DEBUG {
						log.Println(_LABEL_, err)
					}
					return
				} else {
					return
				}
			case <-success:
				return
			}
		}
	}(producer)
	return asyncProducer(producer, topics, s)
}
func asyncProducer(p sarama.AsyncProducer, topics, s string) bool {
	if nil == p {
		return false
	}
	msg := &sarama.ProducerMessage{
		Topic: topics,
		Value: sarama.ByteEncoder(s),
	}
	p.Input() <- msg
	if IS_DEBUG {
		fmt.Println(_LABEL_, msg)
	}
	return true
}
func getProducer(kafka_list string, timeout time.Duration, monitor bool) sarama.AsyncProducer {
	config := sarama.NewConfig()
	config.Producer.Return.Successes = true
	config.Producer.Timeout = timeout
	producer, err := sarama.NewAsyncProducer(strings.Split(kafka_list, ","), config)
	if err != nil {
		if IS_DEBUG {
			log.Println(_LABEL_, err)
		}
	}
	if monitor {
		//消费状态消息,防止死锁
		go func(producer sarama.AsyncProducer) {
			if nil == producer {
				log.Println(_LABEL_, "getProducer() producer error!")
				return
			}
			errors := producer.Errors()
			success := producer.Successes()
			for {
				select {
				case err := <-errors:
					if err != nil {
						if IS_DEBUG {
							log.Println(_LABEL_, err)
						}
						continue
					} else {
						continue
					}
				case <-success:
					continue
				}
			}
		}(producer)
	}
	return producer
}

有疑问加站长微信联系(非本文作者)

入群交流(和以上内容无关):加入Go大咖交流群,或添加微信:liuxiaoyan-s 备注:入群;或加QQ群:692541889

关注微信
6685 次点击
暂无回复
添加一条新回复 (您需要 后才能回复 没有账号 ?)
  • 请尽量让自己的回复能够对别人有帮助
  • 支持 Markdown 格式, **粗体**、~~删除线~~、`单行代码`
  • 支持 @ 本站用户;支持表情(输入 : 提示),见 Emoji cheat sheet
  • 图片支持拖拽、截图粘贴等方式上传

用户登录

没有账号?注册
(追記) (追記ここまで)

今日阅读排行

    加载中
(追記) (追記ここまで)

一周阅读排行

    加载中

关注我

  • 扫码关注领全套学习资料 关注微信公众号
  • 加入 QQ 群:
    • 192706294(已满)
    • 731990104(已满)
    • 798786647(已满)
    • 729884609(已满)
    • 977810755(已满)
    • 815126783(已满)
    • 812540095(已满)
    • 1006366459(已满)
    • 692541889

  • 关注微信公众号
  • 加入微信群:liuxiaoyan-s,备注入群
  • 也欢迎加入知识星球 Go粉丝们(免费)

给该专栏投稿 写篇新文章

每篇文章有总共有 5 次投稿机会

收入到我管理的专栏 新建专栏