1. 首页
  2. 主题
  3. Go问与答

请教下golang goroutine泄露问题?

breadHood · · 3740 次点击
golang新手, 最近在写个练手的小项目, 需求是收集指定日志目录下最新的日志文件, 通过正则过滤日志内容, 把想要的信息存到一个map里. 但是这个程序在实际运行中我发现, 由于在catLog包里读取文件内容时里面我必须使用for死循环实现实时读取文件, 导致在getNewFile包里监控到的系统事件信息无法发送给catLog包,这样如果有新的日志文件产生, 它无法切换文件. 于是我在getNewFile包调用catLog包时启用goroutine, 这样就可以收到通知了. 结果导致每次读取文件新内容时, 程序都会启动一个新的goroutine调用catLog包, 并且不会自动退出,因为catLog包里是for死循环, 一直在累积增加. 请大佬们帮忙看下下面的代码, 该怎么修改可以控制goroutine增长, 并且还能实时监控最新的日志文件. ```golang package getNewFile import ( "fmt" "log" "regexp" catlog "study/prometheus/exporter_watcherLog/catLog" "time" "github.com/fsnotify/fsnotify" ) // 实时监控指定目录下文件并读取 func GetNewFile(pattren, logDir string) { watcher, err := fsnotify.NewWatcher() if err != nil { fmt.Println(err) return } defer watcher.Close() done := make(chan bool) re := regexp.MustCompile(pattren) // Compile 解析一个正则表达式,如果成功则返回一个可用于匹配文本的 Regexp 对象。如错误会panic go func() { for { select { case event := <-watcher.Events: isMatch := re.MatchString(event.Name) if isMatch { if event.Op&fsnotify.Create == fsnotify.Create { // 监控创建文件动作 fmt.Println(event.Name) go catlog.DealLog(event.Name) } if event.Op&fsnotify.Write == fsnotify.Write { // 监控写入文件动作 fmt.Println(event.Name) go catlog.DealLog(event.Name) } } case err := <-watcher.Errors: log.Println(err) } time.Sleep(time.Second * 1) } }() err = watcher.Add(logDir) // 监控指定目录 if err != nil { log.Fatal(err) } <-done } ``` ```golang package catlog import ( "fmt" "regexp" "runtime" "time" "sync" "github.com/hpcloud/tail" ) var AllAddrLoglist map[string]ClassLogInfo var Sm sync.Map var Notify = false type ClassLogInfo struct { Host string Time string Date string Cfc string } // 实时读取文件内容, 并筛选出host, date, time, cfc存入map中 func DealLog(file string) { config := tail.Config{ ReOpen: true, // 重新打开 Follow: true, // 是否跟随 Location: &tail.SeekInfo{Offset: 0, Whence: 2}, // 从文件哪个地方开始读 MustExist: false, // 文件不存在报错 Poll: true, // Logger: tail.DiscardingLogger, // 禁用日志记录 } tails, err := tail.TailFile(file, config) // 打开文件, 并用上面的配置 if err != nil { fmt.Printf("tail file failed, err: %v\n", err) return } var ( line *tail.Line ok bool ) ch := make(chan struct{}, 1) // 创建缓冲区大小, 控制并发, 最多发送x个消息就阻塞 // 正则匹配, 最后只筛选出host;date;time;cfc, 其余的跳过. pattern := `(^[0-9]+/[0-9]+/[0-9]+)\s+([0-9]+:[0-9]+:[0-9]+:[0-9]+)\s+[A-Z]+\s+\[.*\]\s+INFO\s+LSG.*LSG\s+svr\s+update:\s+cfc=([0-9]+),cpfc=[0-9]+,clc=[0-9]+,cplc=[0-9]+,load=[0-9]+,status=[0-9]+,addr=(\d+\.\d+\.\d+\.\d+-\d),inst=\d+\.\d+\.\d+\.\d+-\d` // re := regexp.MustCompile(pattern) // 两个唯一区别, 如错误直接panic re, err := regexp.Compile(pattern) // Compile 解析一个正则表达式,如果成功则返回一个可用于匹配文本的 Regexp 对象。如错误会返回一个错误 if err != nil { fmt.Println(err) } // 读取每行数据,最后写入到AllAddrLoglist中. for { ch <- struct{}{} // 写入消息到缓冲区 line, ok = <-tails.Lines if !ok { fmt.Printf("tail file close reopen, filename: %s\n", tails.Filename) time.Sleep(time.Second) continue } matchArr := re.FindStringSubmatch(line.Text) if matchArr != nil { // 新增判断,替代re.MatchString(line.Text),原有的影响性能 // 信息存入结构体 testI := ClassLogInfo{ Host: matchArr[4], Date: matchArr[1], Time: matchArr[2], Cfc: matchArr[3], } // 利用sync.map, 直接使用, 无需声明, 避免读写同一个map冲突 Sm.Store(matchArr[4], testI) <-ch // 从缓冲区读取消息 } } } ```
buguang01
https://github.com/buguang01
从楼主的问题中我理解,你当前只希望监听一个文件,只是这个文件随时可能会更换新的;同时新的内容可能是写在新文件中,也可能是旧文件的修改; 但是我看到楼主使用的是tail进行文件的读取,其实存在一个问题,就是对方文件都写完了,你再tail其实会丢失一些数据; 一般第三方的日志系统都是写同一个文件,然后文件写到一定大小的时候,修改文件名字,然后再创建一个原来的文件; 这样监听文件的系统就不用频繁换文件了; --- 但如果只是就楼主的问题进行探讨的话,我先说一下,楼主当前程序的几个问题: 1. DealLog函数的ch 是没有意义的。因为他是局部变量不会起到锁的作用; 2. 假设你理解的是对的,这样使用tail不会丢数据,那也不应该是每收到一个event就开一个协程,而是在收到event的时候,直接把那个文件的数据读到文件结束,然后保存到你的map中,然后继续等一次的event过来; 因为watcher底层开着一个协程监听你的目录中所有文件的变化;每次收到变化的时候,他就会发一个event给你;你直接处理就好了。 总结:如果是这样修改,你不会有开多个协程,也就不存在楼主关于协程泄漏的问题了; --- 再假设楼主的环境是可能同时多个文件被写入,希望可以并发的获取数据;方案如下: 1. 首先程序启动的时候,应该对目录下所有的文件,启动协程进行tail; 2. 然后watcher目录是否有新的文件生成,如果有,就为此启动协程进行tail; 总结:如果是这样修改,也就是需求需要一个文件一个协程。你的额外的协程数就是你的目录下的文件数,且还都不能关闭;
#2
更多评论
简单改一下,不知道理解的对不对哈,始终只监听最新的文件,所以 `eventName` 每次替换 ```go go func() { var ctx,cancel = context.WithCancel(context.Background()) var eventName string for { select { case event := &lt;-watcher.Events: // 如果是同一文件,跳过 if eventName == event.Name { continue } isMatch := re.MatchString(event.Name) if isMatch { // 更换文件了,cancel 上次的 go DealLog cancel() // 文件名更新 eventName = event.Name if event.Op&amp;fsnotify.Create == fsnotify.Create { // 监控创建文件动作 fmt.Println(event.Name) // 重新赋值 ctx, cancel = context.WithCancel(context.Background()) go DealLog(ctx, event.Name) } if event.Op&amp;fsnotify.Write == fsnotify.Write { // 监控写入文件动作 fmt.Println(event.Name) ctx, cancel = context.WithCancel(context.Background()) go DealLog(ctx, event.Name) } } case err := &lt;-watcher.Errors: log.Println(err) } time.Sleep(time.Second * 1) } }() ``` ```go // 读取每行数据,最后写入到AllAddrLoglist中. for { select { case line, ok = &lt;-tails.Lines: .... case &lt;-ctx.Done(): return } } ```
#1
谢谢大佬, 这个方法可以的, 学习了!
#3

用户登录

没有账号?注册

今日阅读排行

    加载中

一周阅读排行

    加载中