分享
  1. 首页
  2. 文章

open-falcon-agent源码学习

SkyWay · · 1397 次点击 · · 开始浏览
这是一个创建于 的文章,其中的信息可能已经有所发展或是发生改变。

最近学习falcon,看了源码和极客学院的视频解析,画了调用结构、关系,对主要的代码进行了注释

代码地址:https://github.com/beyondskyw...

标签(空格分隔): falcon go


监控数据

  • 机器性能指标:cpu,mem,网卡,磁盘......

  • 业务监控

  • 开源软件状态:Nginx,Redis,MySQL

  • snmp采集网络设备指标

设计原理

  • 自发现采集值

  • 不同类型数据采集分不同goroutine

  • 进程和端口通过用户配置进行监控

配置文件

  • hostname和ip默认留空,agent自动探测

  • hbs和transfer都是配置其rpc地址

  • collector网卡采集前缀

  • ignore为true时取消上报

组织结构

  • cron:间隔执行的代码,即定时任务

  • funcs:信息采集

  • g:全局数据结构

  • http:简单的dashboard的server,获取单机监控指标数据

  • plugins:插件处理机制

  • public:静态资源文件

心跳机制

  • 了解agent、plugin版本信息,方便升级

  • 获取监听的进程和端口

  • 获取本机执行的插件列表

与HBS、Transfer交互

调用关系

代码解读

  • main入口

go cron.InitDataHistory()
// 上报本机状态
cron.ReportAgentStatus()
// 同步插件
cron.SyncMinePlugins()
// 同步监控端口、路径、进程和URL
cron.SyncBuiltinMetrics()
// 后门调试agent,允许执行shell指令的ip列表
cron.SyncTrustableIps()
// 开始数据次采集
cron.Collect()
// 启动dashboard server
go http.Start()
  • ReportAgentStatus:汇报agent本身状态

// 判断hbs配置是否正常,正常则上报agent状态
if g.Config().Heartbeat.Enabled && g.Config().Heartbeat.Addr != "" {
 // 根据配置的interval间隔上报信息
 go reportAgentStatus(time.Duration(g.Config().Heartbeat.Interval) * time.Second)
}
func reportAgentStatus(interval time.Duration) {
 for {
 // 获取hostname, 出错则错误赋值给hostname
 hostname, err := g.Hostname()
 if err != nil {
 hostname = fmt.Sprintf("error:%s", err.Error())
 }
 // 请求发送信息
 req := model.AgentReportRequest{
 Hostname: hostname,
 IP: g.IP(),
 AgentVersion: g.VERSION,
 // 通过shell指令获取plugin版本,能否go实现
 PluginVersion: g.GetCurrPluginVersion(),
 }
 var resp model.SimpleRpcResponse
 // 调用rpc接口
 err = g.HbsClient.Call("Agent.ReportStatus", req, &resp)
 if err != nil || resp.Code != 0 {
 log.Println("call Agent.ReportStatus fail:", err, "Request:", req, "Response:", resp)
 }
 time.Sleep(interval)
 }
}
  • SyncMinePlugins:同步插件

func syncMinePlugins() {
 var (
 timestamp int64 = -1
 pluginDirs []string
 )
 duration := time.Duration(g.Config().Heartbeat.Interval) * time.Second
 for {
 time.Sleep(duration)
 hostname, err := g.Hostname()
 if err != nil {
 continue
 }
 req := model.AgentHeartbeatRequest{
 Hostname: hostname,
 }
 var resp model.AgentPluginsResponse
 // 调用rpc接口,返回plugin
 err = g.HbsClient.Call("Agent.MinePlugins", req, &resp)
 if err != nil {
 log.Println("ERROR:", err)
 continue
 }
 // 保证时间顺序正确
 if resp.Timestamp <= timestamp {
 continue
 }
 pluginDirs = resp.Plugins
 // 存放时间保证最新
 timestamp = resp.Timestamp
 if g.Config().Debug {
 log.Println(&resp)
 }
 // 无插件则清空plugin
 if len(pluginDirs) == 0 {
 plugins.ClearAllPlugins()
 }
 desiredAll := make(map[string]*plugins.Plugin)
 // 读取所有plugin
 for _, p := range pluginDirs {
 underOneDir := plugins.ListPlugins(strings.Trim(p, "/"))
 for k, v := range underOneDir {
 desiredAll[k] = v
 }
 }
 // 停止不需要的插件,启动增加的插件
 plugins.DelNoUsePlugins(desiredAll)
 plugins.AddNewPlugins(desiredAll)
 }
}
  • SyncBuiltinMetrics:同步内置metric,包括端口、目录和进程信息

func syncBuiltinMetrics() {
 var timestamp int64 = -1
 var checksum string = "nil"
 duration := time.Duration(g.Config().Heartbeat.Interval) * time.Second
 for {
 time.Sleep(duration)
 // 监控端口、目录大小、进程
 var ports = []int64{}
 var paths = []string{}
 var procs = make(map[string]map[int]string)
 var urls = make(map[string]string)
 hostname, err := g.Hostname()
 if err != nil {
 continue
 }
 req := model.AgentHeartbeatRequest{
 Hostname: hostname,
 Checksum: checksum,
 }
 var resp model.BuiltinMetricResponse
 err = g.HbsClient.Call("Agent.BuiltinMetrics", req, &resp)
 if err != nil {
 log.Println("ERROR:", err)
 continue
 }
 if resp.Timestamp <= timestamp {
 continue
 }
 if resp.Checksum == checksum {
 continue
 }
 timestamp = resp.Timestamp
 checksum = resp.Checksum
 for _, metric := range resp.Metrics {
 if metric.Metric == g.URL_CHECK_HEALTH {
 arr := strings.Split(metric.Tags, ",")
 if len(arr) != 2 {
 continue
 }
 url := strings.Split(arr[0], "=")
 if len(url) != 2 {
 continue
 }
 stime := strings.Split(arr[1], "=")
 if len(stime) != 2 {
 continue
 }
 if _, err := strconv.ParseInt(stime[1], 10, 64); err == nil {
 urls[url[1]] = stime[1]
 } else {
 log.Println("metric ParseInt timeout failed:", err)
 }
 }
 // {metric: net.port.listen, tags: port=22}
 if metric.Metric == g.NET_PORT_LISTEN {
 arr := strings.Split(metric.Tags, "=")
 if len(arr) != 2 {
 continue
 }
 if port, err := strconv.ParseInt(arr[1], 10, 64); err == nil {
 ports = append(ports, port)
 } else {
 log.Println("metrics ParseInt failed:", err)
 }
 continue
 }
 // metric: du.bs tags: path=/home/works/logs
 // du -bs /home/works/logs
 if metric.Metric == g.DU_BS {
 arr := strings.Split(metric.Tags, "=")
 if len(arr) != 2 {
 continue
 }
 paths = append(paths, strings.TrimSpace(arr[1]))
 continue
 }
 //mereic: proc.num tags: name=crond
 //或者metric: proc.num tags: cmdline=cfg.json
 if metric.Metric == g.PROC_NUM {
 arr := strings.Split(metric.Tags, ",")
 tmpMap := make(map[int]string)
 for i := 0; i < len(arr); i++ {
 if strings.HasPrefix(arr[i], "name=") {
 tmpMap[1] = strings.TrimSpace(arr[i][5:])
 } else if strings.HasPrefix(arr[i], "cmdline=") {
 tmpMap[2] = strings.TrimSpace(arr[i][8:])
 }
 }
 procs[metric.Tags] = tmpMap
 }
 }
 g.SetReportUrls(urls)
 g.SetReportPorts(ports)
 g.SetReportProcs(procs)
 g.SetDuPaths(paths)
 }
}
  • SyncTrustableIps:同步可信IP列表
    请求获取远程访问执行shell命令的IP白名单,在通过http/run.go调用shell命令是会判断请求IP是否可信

func syncTrustableIps() {
 duration := time.Duration(g.Config().Heartbeat.Interval) * time.Second
 for {
 time.Sleep(duration)
 var ips string
 err := g.HbsClient.Call("Agent.TrustableIps", model.NullRpcRequest{}, &ips)
 if err != nil {
 log.Println("ERROR: call Agent.TrustableIps fail", err)
 continue
 }
 // 设置到本地可信IP列表
 g.SetTrustableIps(ips)
 }
}
  • FuncsAndInterval:拆分不同的采集函数集,方便通过不同goroutine运行

// 间隔internal时间执行fs中的函数
type FuncsAndInterval struct {
 Fs []func() []*model.MetricValue
 Interval int
}
var Mappers []FuncsAndInterval
// 根据调用指令类型和是否容易被挂起而分类(通过不同的goroutine去执行,避免相互之间的影响)
func BuildMappers() {
 interval := g.Config().Transfer.Interval
 Mappers = []FuncsAndInterval{
 FuncsAndInterval{
 Fs: []func() []*model.MetricValue{
 AgentMetrics,
 CpuMetrics,
 NetMetrics,
 KernelMetrics,
 LoadAvgMetrics,
 MemMetrics,
 DiskIOMetrics,
 IOStatsMetrics,
 NetstatMetrics,
 ProcMetrics,
 UdpMetrics,
 },
 Interval: interval,
 },
 // 容易出问题
 FuncsAndInterval{
 Fs: []func() []*model.MetricValue{
 DeviceMetrics,
 },
 Interval: interval,
 },
 // 调用相同指令
 FuncsAndInterval{
 Fs: []func() []*model.MetricValue{
 PortMetrics,
 SocketStatSummaryMetrics,
 },
 Interval: interval,
 },
 FuncsAndInterval{
 Fs: []func() []*model.MetricValue{
 DuMetrics,
 },
 Interval: interval,
 },
 FuncsAndInterval{
 Fs: []func() []*model.MetricValue{
 UrlMetrics,
 },
 Interval: interval,
 },
 }
}
  • Colleet:配置信息读取,读取Mapper中的FuncsAndInterval,根据func调用采集函数,采集所有信息(并非先过滤采集项),从所有采集到的数据中过滤ignore的项,并上报到transfer。

func Collect() {
 // 配置信息判断
 if !g.Config().Transfer.Enabled {
 return
 }
 if len(g.Config().Transfer.Addrs) == 0 {
 return
 }
 // 读取mapper中的FuncsAndInterval集,并通过不同的goroutine运行
 for _, v := range funcs.Mappers {
 go collect(int64(v.Interval), v.Fs)
 }
}
// 间隔采集信息
func collect(sec int64, fns []func() []*model.MetricValue) {
 // 启动断续器,间隔执行
 t := time.NewTicker(time.Second * time.Duration(sec)).C
 for {
 <-t
 hostname, err := g.Hostname()
 if err != nil {
 continue
 }
 mvs := []*model.MetricValue{}
 // 读取忽略metric名单
 ignoreMetrics := g.Config().IgnoreMetrics
 // 从funcs的list中取出每个采集函数
 for _, fn := range fns {
 // 执行采集函数
 items := fn()
 if items == nil {
 continue
 }
 if len(items) == 0 {
 continue
 }
 // 读取采集数据,根据忽略的metric忽略部分采集数据
 for _, mv := range items {
 if b, ok := ignoreMetrics[mv.Metric]; ok && b {
 continue
 } else {
 mvs = append(mvs, mv)
 }
 }
 }
 // 获取上报时间
 now := time.Now().Unix()
 // 设置上报采集项的间隔、agent主机、上报时间
 for j := 0; j < len(mvs); j++ {
 mvs[j].Step = sec
 mvs[j].Endpoint = hostname
 mvs[j].Timestamp = now
 }
 // 调用transfer发送采集数据
 g.SendToTransfer(mvs)
 }
}
  • 采集信息结构

type MetricValue struct {
 Endpoint string // 主机名
 Metric string // 信息标识cpu.idle、mem.memtotal等
 Value interface{} // 采集结果
 Step int64 // 该项上报间隔
 Type string // GAUGE或COUNTER
 Tags string // 配置报警策略
 Timestamp int64 // 此次上报时间
}
  • 采集信息组成metricValue结构

func NewMetricValue(metric string, val interface{}, dataType string, tags ...string) *model.MetricValue {
 mv := model.MetricValue{
 Metric: metric,
 Value: val,
 Type: dataType,
 }
 size := len(tags)
 if size > 0 {
 mv.Tags = strings.Join(tags, ",")
 }
 return &mv
}
// 原值类型
func GaugeValue(metric string, val interface{}, tags ...string) *model.MetricValue {
 return NewMetricValue(metric, val, "GAUGE", tags...)
}
// 计数器类型
func CounterValue(metric string, val interface{}, tags ...string) *model.MetricValue {
 return NewMetricValue(metric, val, "COUNTER", tags...)
}
  • rpc组件

// 简单封装rpc.Cilent
type SingleConnRpcClient struct {
 sync.Mutex
 rpcClient *rpc.Client
 RpcServer string
 Timeout time.Duration
}
// 关闭rpc
func (this *SingleConnRpcClient) close() {
 if this.rpcClient != nil {
 this.rpcClient.Close()
 this.rpcClient = nil
 }
}
// 保证rpc存在,为空则重新创建, 如果server宕机, 死循环????
func (this *SingleConnRpcClient) insureConn() {
 if this.rpcClient != nil {
 return
 }
 var err error
 var retry int = 1
 for {
 if this.rpcClient != nil {
 return
 }
 // 根据timeout和server地址去连接rpc的server
 this.rpcClient, err = net.JsonRpcClient("tcp", this.RpcServer, this.Timeout)
 if err == nil {
 return
 }
 log.Printf("dial %s fail: %v", this.RpcServer, err)
 if retry > 6 {
 retry = 1
 }
 time.Sleep(time.Duration(math.Pow(2.0, float64(retry))) * time.Second)
 retry++
 }
}
// rpc client调用hbs函数
func (this *SingleConnRpcClient) Call(method string, args interface{}, reply interface{}) error {
 // 加锁保证一个agent只与server有一个连接,保证性能
 this.Lock()
 defer this.Unlock()
 // 保证rpc连接可用
 this.insureConn()
 timeout := time.Duration(50 * time.Second)
 done := make(chan error)
 go func() {
 err := this.rpcClient.Call(method, args, reply)
 done <- err
 }()
 // 超时控制
 select {
 case <-time.After(timeout):
 log.Printf("[WARN] rpc call timeout %v => %v", this.rpcClient, this.RpcServer)
 this.close()
 case err := <-done:
 if err != nil {
 this.close()
 return err
 }
 }
 return nil
}
  • Transfer部件

// 定义transfer的rpcClient对应Map, transferClients读写锁
var (
 TransferClientsLock *sync.RWMutex = new(sync.RWMutex)
 TransferClients map[string]*SingleConnRpcClient = map[string]*SingleConnRpcClient{}
)
// 发送数据到随机的transfer
func SendMetrics(metrics []*model.MetricValue, resp *model.TransferResponse) {
 rand.Seed(time.Now().UnixNano())
 // 随机transferClient发送数据,直到发送成功
 for _, i := range rand.Perm(len(Config().Transfer.Addrs)) {
 addr := Config().Transfer.Addrs[i]
 if _, ok := TransferClients[addr]; !ok {
 initTransferClient(addr)
 }
 if updateMetrics(addr, metrics, resp) {
 break
 }
 }
}
// 初始化addr对应的transferClient
func initTransferClient(addr string) {
 TransferClientsLock.Lock()
 defer TransferClientsLock.Unlock()
 TransferClients[addr] = &SingleConnRpcClient{
 RpcServer: addr,
 Timeout: time.Duration(Config().Transfer.Timeout) * time.Millisecond,
 }
}
// 调用rpc接口发送metric
func updateMetrics(addr string, metrics []*model.MetricValue, resp *model.TransferResponse) bool {
 TransferClientsLock.RLock()
 defer TransferClientsLock.RUnlock()
 err := TransferClients[addr].Call("Transfer.Update", metrics, resp)
 if err != nil {
 log.Println("call Transfer.Update fail", addr, err)
 return false
 }
 return true
}
  • 采集插件同步

// 插件信息: 路径、修改时间、运行周期(来自plugin插件)
type Plugin struct {
 FilePath string
 MTime int64
 Cycle int
}
// 插件map和调度器map
var (
 Plugins = make(map[string]*Plugin)
 PluginsWithScheduler = make(map[string]*PluginScheduler)
)
// 删除不需要的plugin
func DelNoUsePlugins(newPlugins map[string]*Plugin) {
 for currKey, currPlugin := range Plugins {
 newPlugin, ok := newPlugins[currKey]
 if !ok || currPlugin.MTime != newPlugin.MTime {
 deletePlugin(currKey)
 }
 }
}
// 添加同步时增加的plugin
func AddNewPlugins(newPlugins map[string]*Plugin) {
 for fpath, newPlugin := range newPlugins {
 // 去除重复插件
 if _, ok := Plugins[fpath]; ok && newPlugin.MTime == Plugins[fpath].MTime {
 continue
 }
 // 为新添加的插件新建调度器
 Plugins[fpath] = newPlugin
 sch := NewPluginScheduler(newPlugin)
 PluginsWithScheduler[fpath] = sch
 // 启动plugin调度
 sch.Schedule()
 }
}
func ClearAllPlugins() {
 for k := range Plugins {
 deletePlugin(k)
 }
}
func deletePlugin(key string) {
 v, ok := PluginsWithScheduler[key]
 if ok {
 // 暂停调度plugin
 v.Stop()
 delete(PluginsWithScheduler, key)
 }
 delete(Plugins, key)
}
  • 插件调度策略

// 持续间隔执行plugin
type PluginScheduler struct {
 Ticker *time.Ticker
 Plugin *Plugin
 Quit chan struct{}
}
// 根据plugin创建新的schedule
func NewPluginScheduler(p *Plugin) *PluginScheduler {
 scheduler := PluginScheduler{Plugin: p}
 scheduler.Ticker = time.NewTicker(time.Duration(p.Cycle) * time.Second)
 scheduler.Quit = make(chan struct{})
 return &scheduler
}
// plugin调度,间隔执行PluginRun,除非收到quit消息
func (this *PluginScheduler) Schedule() {
 go func() {
 for {
 select {
 case <-this.Ticker.C:
 PluginRun(this.Plugin)
 case <-this.Quit:
 this.Ticker.Stop()
 return
 }
 }
 }()
}
// 停止plugin调度
func (this *PluginScheduler) Stop() {
 close(this.Quit)
}
// 执行插件,读取插件运行返回数据并上报transfer
func PluginRun(plugin *Plugin) {
 timeout := plugin.Cycle*1000 - 500
 fpath := filepath.Join(g.Config().Plugin.Dir, plugin.FilePath)
 if !file.IsExist(fpath) {
 log.Println("no such plugin:", fpath)
 return
 }
 debug := g.Config().Debug
 if debug {
 log.Println(fpath, "running...")
 }
 cmd := exec.Command(fpath)
 var stdout bytes.Buffer
 cmd.Stdout = &stdout
 var stderr bytes.Buffer
 cmd.Stderr = &stderr
 cmd.Start()
 err, isTimeout := sys.CmdRunWithTimeout(cmd, time.Duration(timeout)*time.Millisecond)
 errStr := stderr.String()
 if errStr != "" {
 logFile := filepath.Join(g.Config().Plugin.LogDir, plugin.FilePath+".stderr.log")
 if _, err = file.WriteString(logFile, errStr); err != nil {
 log.Printf("[ERROR] write log to %s fail, error: %s\n", logFile, err)
 }
 }
 if isTimeout {
 // has be killed
 if err == nil && debug {
 log.Println("[INFO] timeout and kill process", fpath, "successfully")
 }
 if err != nil {
 log.Println("[ERROR] kill process", fpath, "occur error:", err)
 }
 return
 }
 if err != nil {
 log.Println("[ERROR] exec plugin", fpath, "fail. error:", err)
 return
 }
 // exec successfully
 data := stdout.Bytes()
 if len(data) == 0 {
 if debug {
 log.Println("[DEBUG] stdout of", fpath, "is blank")
 }
 return
 }
 var metrics []*model.MetricValue
 err = json.Unmarshal(data, &metrics)
 if err != nil {
 log.Printf("[ERROR] json.Unmarshal stdout of %s fail. error:%s stdout: \n%s\n", fpath, err, stdout.String())
 return
 }
 g.SendToTransfer(metrics)
}

有疑问加站长微信联系(非本文作者)

本文来自:Segmentfault

感谢作者:SkyWay

查看原文:open-falcon-agent源码学习

入群交流(和以上内容无关):加入Go大咖交流群,或添加微信:liuxiaoyan-s 备注:入群;或加QQ群:692541889

关注微信
1397 次点击
暂无回复
添加一条新回复 (您需要 后才能回复 没有账号 ?)
  • 请尽量让自己的回复能够对别人有帮助
  • 支持 Markdown 格式, **粗体**、~~删除线~~、`单行代码`
  • 支持 @ 本站用户;支持表情(输入 : 提示),见 Emoji cheat sheet
  • 图片支持拖拽、截图粘贴等方式上传

用户登录

没有账号?注册
(追記) (追記ここまで)

今日阅读排行

    加载中
(追記) (追記ここまで)

一周阅读排行

    加载中

关注我

  • 扫码关注领全套学习资料 关注微信公众号
  • 加入 QQ 群:
    • 192706294(已满)
    • 731990104(已满)
    • 798786647(已满)
    • 729884609(已满)
    • 977810755(已满)
    • 815126783(已满)
    • 812540095(已满)
    • 1006366459(已满)
    • 692541889

  • 关注微信公众号
  • 加入微信群:liuxiaoyan-s,备注入群
  • 也欢迎加入知识星球 Go粉丝们(免费)

给该专栏投稿 写篇新文章

每篇文章有总共有 5 次投稿机会

收入到我管理的专栏 新建专栏