分享
  1. 首页
  2. 文章

聊聊dapr的fswatcher

codecraft · · 682 次点击 · · 开始浏览
这是一个创建于 的文章,其中的信息可能已经有所发展或是发生改变。

本文主要研究一下dapr的fswatcher

fswatcher

dapr/pkg/fswatcher/fswatcher.go

import (
 "context"
 "strings"
 "time"
 "github.com/fsnotify/fsnotify"
 "github.com/pkg/errors"
)
func Watch(ctx context.Context, dir string, eventCh chan<- struct{}) error {
 watcher, err := fsnotify.NewWatcher()
 if err != nil {
 return errors.Wrap(err, "failed to create watcher")
 }
 defer watcher.Close()
 if err := watcher.Add(dir); err != nil {
 return errors.Wrap(err, "watcher error")
 }
LOOP:
 for {
 select {
 // watch for events
 case event := <-watcher.Events:
 if event.Op == fsnotify.Create || event.Op == fsnotify.Write {
 if strings.Contains(event.Name, dir) {
 // give time for other updates to occur
 time.Sleep(time.Second * 1)
 eventCh <- struct{}{}
 }
 }
 case <-watcher.Errors:
 break LOOP
 case <-ctx.Done():
 break LOOP
 }
 }
 return nil
}
Watch方法使用fsnotify的watcher来监听文件,之后通过for循环进行select,如果监听到fsnotify.Create或者fsnotify.Write的时候判断event.Name是否包含dir,如果包含则sleep一秒然后通知eventCh

Add

github.com/fsnotify/fsnotify@v1.4.9/kqueue.go

// Add starts watching the named file or directory (non-recursively).
func (w *Watcher) Add(name string) error {
 w.mu.Lock()
 w.externalWatches[name] = true
 w.mu.Unlock()
 _, err := w.addWatch(name, noteAllEvents)
 return err
}
Add方法设置externalWatches[name]为true,然后执行addWatch(name, noteAllEvents)

addWatch

github.com/fsnotify/fsnotify@v1.4.9/kqueue.go

// addWatch adds name to the watched file set.
// The flags are interpreted as described in kevent(2).
// Returns the real path to the file which was added, if any, which may be different from the one passed in the case of symlinks.
func (w *Watcher) addWatch(name string, flags uint32) (string, error) {
 var isDir bool
 // Make ./name and name equivalent
 name = filepath.Clean(name)
 w.mu.Lock()
 if w.isClosed {
 w.mu.Unlock()
 return "", errors.New("kevent instance already closed")
 }
 watchfd, alreadyWatching := w.watches[name]
 // We already have a watch, but we can still override flags.
 if alreadyWatching {
 isDir = w.paths[watchfd].isDir
 }
 w.mu.Unlock()
 if !alreadyWatching {
 fi, err := os.Lstat(name)
 if err != nil {
 return "", err
 }
 // Don't watch sockets.
 if fi.Mode()&os.ModeSocket == os.ModeSocket {
 return "", nil
 }
 // Don't watch named pipes.
 if fi.Mode()&os.ModeNamedPipe == os.ModeNamedPipe {
 return "", nil
 }
 // Follow Symlinks
 // Unfortunately, Linux can add bogus symlinks to watch list without
 // issue, and Windows can't do symlinks period (AFAIK). To maintain
 // consistency, we will act like everything is fine. There will simply
 // be no file events for broken symlinks.
 // Hence the returns of nil on errors.
 if fi.Mode()&os.ModeSymlink == os.ModeSymlink {
 name, err = filepath.EvalSymlinks(name)
 if err != nil {
 return "", nil
 }
 w.mu.Lock()
 _, alreadyWatching = w.watches[name]
 w.mu.Unlock()
 if alreadyWatching {
 return name, nil
 }
 fi, err = os.Lstat(name)
 if err != nil {
 return "", nil
 }
 }
 watchfd, err = unix.Open(name, openMode, 0700)
 if watchfd == -1 {
 return "", err
 }
 isDir = fi.IsDir()
 }
 const registerAdd = unix.EV_ADD | unix.EV_CLEAR | unix.EV_ENABLE
 if err := register(w.kq, []int{watchfd}, registerAdd, flags); err != nil {
 unix.Close(watchfd)
 return "", err
 }
 if !alreadyWatching {
 w.mu.Lock()
 w.watches[name] = watchfd
 w.paths[watchfd] = pathInfo{name: name, isDir: isDir}
 w.mu.Unlock()
 }
 if isDir {
 // Watch the directory if it has not been watched before,
 // or if it was watched before, but perhaps only a NOTE_DELETE (watchDirectoryFiles)
 w.mu.Lock()
 watchDir := (flags&unix.NOTE_WRITE) == unix.NOTE_WRITE &&
 (!alreadyWatching || (w.dirFlags[name]&unix.NOTE_WRITE) != unix.NOTE_WRITE)
 // Store flags so this watch can be updated later
 w.dirFlags[name] = flags
 w.mu.Unlock()
 if watchDir {
 if err := w.watchDirectoryFiles(name); err != nil {
 return "", err
 }
 }
 }
 return name, nil
}
addWatch方法针对尚未watch的执行os.Lstat(name)及unix.Open(name, openMode, 0700);之后注册registerAdd;另外针对isDir的情况执行watchDirectoryFiles

watchDirectoryFiles

github.com/fsnotify/fsnotify@v1.4.9/kqueue.go

// watchDirectoryFiles to mimic inotify when adding a watch on a directory
func (w *Watcher) watchDirectoryFiles(dirPath string) error {
 // Get all files
 files, err := ioutil.ReadDir(dirPath)
 if err != nil {
 return err
 }
 for _, fileInfo := range files {
 filePath := filepath.Join(dirPath, fileInfo.Name())
 filePath, err = w.internalWatch(filePath, fileInfo)
 if err != nil {
 return err
 }
 w.mu.Lock()
 w.fileExists[filePath] = true
 w.mu.Unlock()
 }
 return nil
}
watchDirectoryFiles遍历files,挨个执行internalWatch

internalWatch

github.com/fsnotify/fsnotify@v1.4.9/kqueue.go

func (w *Watcher) internalWatch(name string, fileInfo os.FileInfo) (string, error) {
 if fileInfo.IsDir() {
 // mimic Linux providing delete events for subdirectories
 // but preserve the flags used if currently watching subdirectory
 w.mu.Lock()
 flags := w.dirFlags[name]
 w.mu.Unlock()
 flags |= unix.NOTE_DELETE | unix.NOTE_RENAME
 return w.addWatch(name, flags)
 }
 // watch file to mimic Linux inotify
 return w.addWatch(name, noteAllEvents)
}
internalWatch针对dir设置的flag为NOTE_DELETE、NOTE_RENAME

小结

dapr的fswatcher使用fsnotify的watcher来监听文件,之后通过for循环进行select,如果监听到fsnotify.Create或者fsnotify.Write的时候判断event.Name是否包含dir,如果包含则sleep一秒然后通知eventCh。

doc


有疑问加站长微信联系(非本文作者)

本文来自:Segmentfault

感谢作者:codecraft

查看原文:聊聊dapr的fswatcher

入群交流(和以上内容无关):加入Go大咖交流群,或添加微信:liuxiaoyan-s 备注:入群;或加QQ群:692541889

关注微信
682 次点击
暂无回复
添加一条新回复 (您需要 后才能回复 没有账号 ?)
  • 请尽量让自己的回复能够对别人有帮助
  • 支持 Markdown 格式, **粗体**、~~删除线~~、`单行代码`
  • 支持 @ 本站用户;支持表情(输入 : 提示),见 Emoji cheat sheet
  • 图片支持拖拽、截图粘贴等方式上传

用户登录

没有账号?注册
(追記) (追記ここまで)

今日阅读排行

    加载中
(追記) (追記ここまで)

一周阅读排行

    加载中

关注我

  • 扫码关注领全套学习资料 关注微信公众号
  • 加入 QQ 群:
    • 192706294(已满)
    • 731990104(已满)
    • 798786647(已满)
    • 729884609(已满)
    • 977810755(已满)
    • 815126783(已满)
    • 812540095(已满)
    • 1006366459(已满)
    • 692541889

  • 关注微信公众号
  • 加入微信群:liuxiaoyan-s,备注入群
  • 也欢迎加入知识星球 Go粉丝们(免费)

给该专栏投稿 写篇新文章

每篇文章有总共有 5 次投稿机会

收入到我管理的专栏 新建专栏