分享
  1. 首页
  2. 文章

聊聊golang的zap的Sink

go4it · · 794 次点击 · · 开始浏览
这是一个创建于 的文章,其中的信息可能已经有所发展或是发生改变。

本文主要研究一下golang的zap的Sink

Sink

zap@v1.16.0/sink.go

type Sink interface {
 zapcore.WriteSyncer
 io.Closer
}
type WriteSyncer interface {
 io.Writer
 Sync() error
}
type Writer interface {
 Write(p []byte) (n int, err error)
}
type Closer interface {
 Close() error
}

Sink接口内嵌了zapcore.WriteSyncer(Write、Sync)、io.Closer(Close)接口

RegisterSink

zap@v1.16.0/sink.go

const schemeFile = "file"
var (
 _sinkMutex sync.RWMutex
 _sinkFactories map[string]func(*url.URL) (Sink, error) // keyed by scheme
)
func init() {
 resetSinkRegistry()
}
func resetSinkRegistry() {
 _sinkMutex.Lock()
 defer _sinkMutex.Unlock()
 _sinkFactories = map[string]func(*url.URL) (Sink, error){
 schemeFile: newFileSink,
 }
}
func RegisterSink(scheme string, factory func(*url.URL) (Sink, error)) error {
 _sinkMutex.Lock()
 defer _sinkMutex.Unlock()
 if scheme == "" {
 return errors.New("can't register a sink factory for empty string")
 }
 normalized, err := normalizeScheme(scheme)
 if err != nil {
 return fmt.Errorf("%q is not a valid scheme: %v", scheme, err)
 }
 if _, ok := _sinkFactories[normalized]; ok {
 return fmt.Errorf("sink factory already registered for scheme %q", normalized)
 }
 _sinkFactories[normalized] = factory
 return nil
}

RegisterSink方法会往_sinkFactories注册指定scheme的sink factory,该factory接收url.URL返回Sink;resetSinkRegistry方法默认注册了scheme为file的newFileSink

newFileSink

zap@v1.16.0/sink.go

func newFileSink(u *url.URL) (Sink, error) {
 if u.User != nil {
 return nil, fmt.Errorf("user and password not allowed with file URLs: got %v", u)
 }
 if u.Fragment != "" {
 return nil, fmt.Errorf("fragments not allowed with file URLs: got %v", u)
 }
 if u.RawQuery != "" {
 return nil, fmt.Errorf("query parameters not allowed with file URLs: got %v", u)
 }
 // Error messages are better if we check hostname and port separately.
 if u.Port() != "" {
 return nil, fmt.Errorf("ports not allowed with file URLs: got %v", u)
 }
 if hn := u.Hostname(); hn != "" && hn != "localhost" {
 return nil, fmt.Errorf("file URLs must leave host empty or use localhost: got %v", u)
 }
 switch u.Path {
 case "stdout":
 return nopCloserSink{os.Stdout}, nil
 case "stderr":
 return nopCloserSink{os.Stderr}, nil
 }
 return os.OpenFile(u.Path, os.O_WRONLY|os.O_APPEND|os.O_CREATE, 0666)
}

newFileSink使用os.OpenFile创建*os.File,由于*os.File拥有Write、Sync、Close方法,因而它实现了Sink接口

newSink

zap@v1.16.0/sink.go

func newSink(rawURL string) (Sink, error) {
 u, err := url.Parse(rawURL)
 if err != nil {
 return nil, fmt.Errorf("can't parse %q as a URL: %v", rawURL, err)
 }
 if u.Scheme == "" {
 u.Scheme = schemeFile
 }
 _sinkMutex.RLock()
 factory, ok := _sinkFactories[u.Scheme]
 _sinkMutex.RUnlock()
 if !ok {
 return nil, &errSinkNotFound{u.Scheme}
 }
 return factory(u)
}

newSink方法会根据rawURL解析对应的scheme,如果scheme为空则默认为file,然后从_sinkFactories找到对应的factory,创建sink返回

open

zap@v1.16.0/writer.go

func Open(paths ...string) (zapcore.WriteSyncer, func(), error) {
 writers, close, err := open(paths)
 if err != nil {
 return nil, nil, err
 }
 writer := CombineWriteSyncers(writers...)
 return writer, close, nil
}
func open(paths []string) ([]zapcore.WriteSyncer, func(), error) {
 writers := make([]zapcore.WriteSyncer, 0, len(paths))
 closers := make([]io.Closer, 0, len(paths))
 close := func() {
 for _, c := range closers {
 c.Close()
 }
 }
 var openErr error
 for _, path := range paths {
 sink, err := newSink(path)
 if err != nil {
 openErr = multierr.Append(openErr, fmt.Errorf("couldn't open sink %q: %v", path, err))
 continue
 }
 writers = append(writers, sink)
 closers = append(closers, sink)
 }
 if openErr != nil {
 close()
 return writers, nil, openErr
 }
 return writers, close, nil
}

zap.Open方法会使用newSink来创建sink作为zapcore.WriteSyncer

实例

func registerSinkDemo() {
 zap.RegisterSink("mq", mq.NewMqSink)
 writer, close, err := zap.Open("mq://192.168.99.100:9876/log")
 if err != nil {
 panic(err)
 }
 defer close()
 logger := zap.New(zapcore.NewCore(zapcore.NewJSONEncoder(zap.NewProductionEncoderConfig()), writer, zap.DebugLevel)).Sugar()
 logger.Info("hello")
}
type MqWriteSyncer struct {
 topic string
 producer rocketmq.Producer
 ctx context.Context
}
func (m *MqWriteSyncer) Close() error {
 return m.producer.Shutdown()
}
func (m *MqWriteSyncer) Write(p []byte) (n int, err error) {
 msg := &primitive.Message{
 Topic: m.topic,
 Body: p,
 }
 err = m.producer.SendOneWay(m.ctx, msg)
 return len(p), err
}
func (m *MqWriteSyncer) Sync() error {
 return nil
}
func NewMqSink(url *url.URL) (zap.Sink, error) {
 broker := fmt.Sprintf("%s:%s", url.Hostname(), url.Port())
 topic := url.Path[1:len(url.Path)]
 p, _ := rocketmq.NewProducer(
 producer.WithNameServer([]string{broker}),
 producer.WithRetry(2),
 )
 err := p.Start()
 if err != nil {
 fmt.Printf("start producer error: %s", err.Error())
 return nil, err
 }
 return &MqWriteSyncer{producer: p, ctx: context.Background(), topic: topic}, nil
}

这里通过zap.RegisterSink来注册一个mq的sink factory,然后通过zap.Open来创建MqWriteSyncer;MqWriteSyncer实现了zapcore.WriteSyncer的Write、Sync方法,同时也实现了Sink的Close方法

小结

Sink接口内嵌了zapcore.WriteSyncer(Write、Sync)、io.Closer(Close)接口;zap.RegisterSink用于注册指定scheme的sink factory,而zap.Open则会解析url来找到对应的sink factory创建对应的sink,即writer。

doc


有疑问加站长微信联系(非本文作者)

本文来自:简书

感谢作者:go4it

查看原文:聊聊golang的zap的Sink

入群交流(和以上内容无关):加入Go大咖交流群,或添加微信:liuxiaoyan-s 备注:入群;或加QQ群:692541889

关注微信
794 次点击
暂无回复
添加一条新回复 (您需要 后才能回复 没有账号 ?)
  • 请尽量让自己的回复能够对别人有帮助
  • 支持 Markdown 格式, **粗体**、~~删除线~~、`单行代码`
  • 支持 @ 本站用户;支持表情(输入 : 提示),见 Emoji cheat sheet
  • 图片支持拖拽、截图粘贴等方式上传

用户登录

没有账号?注册
(追記) (追記ここまで)

今日阅读排行

    加载中
(追記) (追記ここまで)

一周阅读排行

    加载中

关注我

  • 扫码关注领全套学习资料 关注微信公众号
  • 加入 QQ 群:
    • 192706294(已满)
    • 731990104(已满)
    • 798786647(已满)
    • 729884609(已满)
    • 977810755(已满)
    • 815126783(已满)
    • 812540095(已满)
    • 1006366459(已满)
    • 692541889

  • 关注微信公众号
  • 加入微信群:liuxiaoyan-s,备注入群
  • 也欢迎加入知识星球 Go粉丝们(免费)

给该专栏投稿 写篇新文章

每篇文章有总共有 5 次投稿机会

收入到我管理的专栏 新建专栏