一款比较强大的日志监控告警工具

作者:mdo 发布时间: 2025-11-14 阅读量:3

详解Nightingale安装与配置

前置准备

主流的日志采集都是通过skywalking采集日志到elasticsearch,如下架构图所示:

而本文的案例则是通过elasticsearch采集系统程序运行日志,并按照Nightingale协定的规则配置定时采集任务,一旦感知到elasticsearch日志中存在错误日志则发出告警:

为了更好的演示夜莺如何根据采集日志并发出告警,笔者这里也简单介绍一下本文数据源elasticsearch的的配置。本文elasticsearch选用版本为7.12.0,在该版本的es上笔者刷入如下索引,可以看到这款索引很好的模拟了日常采集日志的常见数据:

  1. 微服务名称
  2. 日志消息
  3. 日志时间
  4. 日志级别
--  创建索引
curl -X PUT "localhost:9200/java_app_logs" -H 
{
  "mappings": {
    "properties": {
      "service_name": {
        "type": "keyword"
      },
      "log_message": {
        "type": "text"
      },
      "timestamp": {
        "type": "date"
      }
    }
  }
}


-- 添加日志级别
curl -X PUT "localhost:9200/java_app_logs/_mapping" -H 
{
  "properties": {
    "log_level": {
      "type": "keyword"
    }
  }
}

完成必要的索引创建后,我们就可以按需刷入如下请求建立文档模拟用户登录成功和失败的消息:

-- 添加日志

curl -X POST "localhost:9200/java_app_logs/_doc" -H 
{
  "service_name": "user-service",
  "log_message": "用户登录成功,用户ID: 12345",
  "log_level": "INFO",
  "timestamp": "'$(date -u +"%Y-%m-%dT%H:%M:%S.%3NZ")'"
}


-- 登录失败的日志
curl -X POST "localhost:9200/java_app_logs/_doc" -H 
{
  "service_name": "user-service",
  "log_message": "用户登录失败,用户名: testuser,原因: 密码错误",
  "log_level": "ERROR", 
  "timestamp": "'$(date -u +"%Y-%m-%dT%H:%M:%S.%3NZ")'"
}

自此,我们就有了一份可进行采集监控的es数据源,就可以开始通过夜莺进行日志监控告警配置了。

二进制安装

因为笔者采用Linux服务器部署且采用二进制的方式安装,所以我们需要到github下载最新版本,对应的下载地址为:github.com/ccfos/night…

将下载好的压缩包进行解压:

tar zxvf n9e-v8.4.0-linux-amd64.tar.gz

然后我们直接进入bin目录键入如下指令就可以将夜莺启动了:

./n9e

随后我们通过17000端口即可访问Nightingale登录界面,默认情况下Nightingale的账户和密码分别是:

账户:root
密码:root.2020

明确可以正常启动,读者可以使用如下指令以后台的方式启动:

cd /opt/n9e && nohup ./n9e &> n9e.log &

数据源配置

登录管理界面之后,就可以将es数据源引入进行监控告警管理了,通过集成中心定位到数据源,点击进入数据源配置界面:

随后点击新增并选择ElasticSearch数据源:

根据提示依次配置:

  1. 数据源名称
  2. 访问地址
  3. 版本信息

明确无误后,点击测试连通性并保存:

如下图所示,这样就说明数据源添加成功了,自此我们的Nightingale就可以针对日志数据源进行监控告警规则配置了:

日志查询

为了明确我们es写入的日志能够被准确的查询到,我们可以先通过Nightingale的日志页面针对针对该数据源进行日志查询,以笔者为例,对应的配置为:

  1. 数据源选择elasticsearch数据源
  2. 选择索引模式即indices
  3. 索引使用java_app_logs
  4. 日志过滤条件为日志级别为错误级别的即log_level:"ERROR"

对应的配置和输出结果如下,由此可确定笔者的配置没有任何问题:

告警规则配置与调测

重头戏来了,明确错误日志查询无误后,我们就可以按照我们上述的调测针对error级别的日志配置相应告警规则了,按照我们需求的说法,即一旦查询到错误级别的日志超过1条则直接发出告警,我们可以到告警面板选择规则管理配置监控告警,以笔者为例首先指明规则为错误日志监控告警:

针对规则配置,相应配置为:

  1. 选择数据源类型为elasticsearch数据源
  2. 选择精确匹配指定数据源为我们的elasticsearch数据源
  3. 查询统计项指明索引为java_app_logs且过滤条件为过滤出错误类型日志log_level:"ERROR"
  4. 时间间隔设置为30min以内的数据

明确这个时间后,我们可以在es上刷几条错误日志看看规则配置是否可以正确执行:

curl -X POST "localhost:9200/java_app_logs/_doc" -H 
{
  "service_name": "user-service",
  "log_message": "用户登录失败,用户名: testuser,原因: 密码错误",
  "log_level": "ERROR", 
  "timestamp": "'$(date -u +"%Y-%m-%dT%H:%M:%S.%3NZ")'"
}

可以看到通过表达式我们拿到了30条数据,说明这个规则配置没有问题:

针对阈值判断,笔者指定规则为上述规则大于1也就是错误日志大于0条则触发告警,对应这个监控频率为1min一次,持续时长设置为0即代表只要出现一次直接告警。

这里我们也补充说明一下持续时长的概念,按照官网的说法持续时长即代表该规则为真时且持续配置的时间后才触发告警,例如我们1min执行一次,持续时长配置为120s即代表定时任务两次采集都收到错误级别日志大于1才触发告警。

当然笔者这里也是出于简单,直接配置为0:

其他配置全部默认保存即可。

告警验收

此时,一旦触发告警我们就可以在告警事件上看到事件,因为夜莺默认情况下都会将事件存储在缓存中,基于这种设计理念我们可以将实时告警在夜莺平台上对接各种方式通知用户:

  1. 企业微信
  2. 阿里云短信
  3. 邮件

基于源码详解夜莺工作机制

日志告警预览查询原理

通过上述的实践,我们基本了解了夜莺的基本使用方式,为了更好的帮助读者理解夜莺这个开源告警的工作原理,笔者也将Nightingale的源码克隆到本地针对几个比较核心的部分进入深入的拆解分析。

首先我们先来说说数据预览这一块,因为Nightingale的数值提取针对支持对es日志进行count、max、min等多种日志数据收敛提取方式,然后通过数据源预览即可得到查询结果:

通过浏览器控台,笔者定位到对应的请求映射为http://127.0.0.1:17000/api/n9e/ds-query,同时我们也可以看到请求参数,如下所示,这里笔者也针对说明一下如下几个参数的含义:

  1. cate:指明数据源类型为elasticsearch
  2. datasource_id:规则配置使用的数据源为id为1,也就是我们首次配置的elasticsearch数据源
  3. query:这个json块比较重要,它说明我们配置的规则名为A,索引类型index_type为索引类型而非索引匹配模式,然后index指明为java_app_logs,然后就是kql配置规则和提取方式为count
  4. date_field:指明采集的时间字段用索引中的timestamp

其他参数则是轮询间隔和基于这个间隔生成的起止时间:

{
  "cate": "elasticsearch",
  "datasource_id": 1,
  "query": [
    {
      "ref": "A",
      "index_type": "index",
      "index": "java_app_logs",
      "filter": "log_level:"ERROR"",
      "value": {
        "func": "count"
      },
      "date_field": "timestamp",
      "interval": 1800,
      "start": 1762532762,
      "end": 1762534562
    }
  ]
}

返回结果如下,通过整体结构我们可以看出,对应A规则的在17625312002025年11月8日 00:00:00查出count22

{
    "dat": [
        {
            "ref": "A",
            "metric": {
                "__name__": "A__count"
            },
            "values": [
                [
                    1762531200,
                    22
                ]
            ],
            "query": "map[date_field:timestamp end:1.762534562e+09 filter:log_level:"ERROR" index:java_app_logs index_type:index interval:1800 ref:A start:1.762532762e+09 value:map[func:count]]"
        }
    ],
    "err": ""
}

明确这个接口出参和入参之后,我们就可以针对该接口进行深入分析了,本质上Nightingale数值提取查询就是通过上述请求参数生成es的restful请求参数,并通过返回结果中的聚合通提取到规则对应的count、max、min等信息然后返回给用户:

对应的我们可以在router.go文件中看到这个请求的入口,可以看到该请求映射本质上都是通过QueryData这个函数处理的:

  
   pages.POST("/ds-query", rt.QueryData)

步入QueryData,即可看到如下几个步骤:

  1. 将请求参数绑定到变量f这个QueryParam结构体上,本质上就是将上述的入参进行一个一一对应的封装
  2. 通过QueryDataConcurrently发起es查询请求
  3. 将查询结果返回
func (rt *Router) QueryData(c *gin.Context) {
 
 var f models.QueryParam
 
 ginx.BindJSON(c, &f)
 
 resp, err := QueryDataConcurrently(rt.Center.AnonymousAccess.PromQuerier, c, f)
 
 
 
 ginx.NewRender(c).Data(resp, nil)
}

而QueryDataConcurrently内部逻辑也比较简单,因为我们的http请求入参中的query是个数组,所以该函数会拿到QueryParam中的query数组进行遍历,生成一个个协程发起请求并通过倒计时门栓阻塞等待,当所有请求结果写入切片后,直接将切片resp返回:

func QueryDataConcurrently(anonymousAccess bool, ctx *gin.Context, f models.QueryParam) ([]models.DataResp, error) {
 
 var resp []models.DataResp
 
 for _, q := range f.Querys {
  

 
  
  wg.Add(1)
  
  go func(query interface{}) {
   defer wg.Done()
   
   datas, err := plug.QueryData(ctx.Request.Context(), query)
   

  
   
   resp = append(resp, datas...)
   mu.Unlock()
  }(q)
 }
 
 wg.Wait()

 if len(errs) > 0 {
  return nil, errs[0]
 }

 
 
 return resp, nil
}

通过上述源码可以看到每一个协程都是通过plug.QueryData(ctx.Request.Context(), query)发起es请求的,实际上这段逻辑都是在eslike.go上完成的,对应的执行步骤为就是:

  1. 基于filter生成es的must表达式
  2. 基于timestamp和interval生成range查询
  3. 基于1、2构建一个bool查询
  4. 通过参数value指明的count聚合,构建出一个桶聚合,指明不同时间区间的符合要求的文档数

对应的参数解析映射如下:

最后es会返回类似如下的一个结构体,Nightingale就会拿出这个doc_count作为结果封装返回:

"aggregations" : {
    "ts" : {
      "buckets" : [
        {
          "key_as_string" : "2025-11-05T14:30:00.000Z",
          "key" : 1762353000000,
          "doc_count" : 10
        }
      ]
    }
  }

对应的逻辑参见如下eslike.go的QueryData,和上述说明一致,读者可结合注释阅读理解:

func QueryData(ctx context.Context, queryParam interface{}, cliTimeout int64, version string, search SearchFunc) ([]models.DataResp, error) {
 //......
 //生成range对象,结构体类似于
 /**
 range": {
           "timestamp": {
             "format": "epoch_millis",
             "from": 1762353000000,
             "include_lower": true,
             "include_upper": true,
             "to": 1762353600000
           }
         }
 */
 q := elastic.NewRangeQuery(param.DateField)
 //......
 //生成时间范围并给出返回对应的时间单位
 q.Gte(time.Unix(start, 0).UnixMilli())
 q.Lte(time.Unix(end, 0).UnixMilli())
 q.Format("epoch_millis")

  //......
 //生成bool查询并基于fiter构建出must子句,传入q即将range查询存入filter子句中
 /**
 "query": {
     "bool": {
       "filter": {
         "range": {
           "timestamp": {
             "format": "epoch_millis",
             "from": 1762353000000,
             "include_lower": true,
             "include_upper": true,
             "to": 1762353600000
           }
         }
       },
       "must": {
         "query_string": {
           "query": "log_level:"ERROR""
         }
       }
     }
   }
  */
 queryString := GetQueryString(param.Filter, q)

 var aggr elastic.Aggregation
 switch param.MetricAggr.Func {
 case "avg":
  aggr = elastic.NewAvgAggregation().Field(field)
 //......
  aggr = elastic.NewSumAggregation().Field(field)
 case "count":
  
  aggr = elastic.NewValueCountAggregation().Field(field)
//......
 default:
  return nil, fmt.Errorf("func %s not support", param.MetricAggr.Func)
 }
 //生成聚合桶查询,每个桶至少要有一个文档,少了就不显示
 tsAggr := elastic.NewDateHistogramAggregation().
  Field(param.DateField).
  MinDocCount(1)

 //......
 //构建查询参数
 searchSource := elastic.NewSearchSource().
  Query(queryString).
  Aggregation("ts", tsAggr) //设置自定义聚合名称为ts
 //......
 //发起请求
 result, err := search(ctx, indexArr, searchSource, param.Timeout, param.MaxShard)
 //......
 /** 提取ts中的bucket桶提取count和key也就是时间生成item返回
 "aggregations" : {
     "ts" : {
       "buckets" : [
         {
           "key_as_string" : "2025-11-05T14:30:00.000Z",
           "key" : 1762353000000,
           "doc_count" : 10
         }
       ]
     }
   }
  */
 js, err := simplejson.NewJson(result.Aggregations["ts"])
 //......
 bucketsData, err := js.Get("buckets").Array()
 return items, nil
}

告警规则添加

了解了规则配置解析之后,我们再来了解后续步骤,即如何将规则持久化到Nightingale底层的数据库中,让告警监控的工作协程能够获取到这个规则进行定时查询和告警消息缓存。

对应的我们通过浏览器控台定位到接口映射为http://127.0.0.1:17000/api/n9e/busi-group/1/alert-rules,请求参数为一个比较大的JSON数组,为了更好的说明和解析,这里笔者也给出几个比较核心的部分以便于读者更准确的理解规则的存储过程。

先来看看参数的第一部分,可以看到这个规则通过cate即category指明数据源类型为elasticsearch,并通过datasource这个数组块指明等价匹配数据源-1也就是elasticsearch

"cate": "elasticsearch",
  "datasource_queries": [{
    "match_type": 0, 
    "op": "in", 
    "values": [
      1 
    ]
  }],

然后就是规则配置,对应的rule_configJSON参数如下,这里笔者抽出核心的两个部分,第一个部分也就是上面规则查询的参数,对应的明细笔者上文已经详细解释过了,这里就不多做赘述,这里我们着重说明一下trigger,其内部有个expressions结合3个参数语义和浏览器界面即知晓这个就是触发条件的配置,他告知Nightingale在查询count大于0的时候即可触发告警:

"rule_config": {
    "queries": [{ //查询条件参数
      "prom_ql": "",
      "severity": 2,
      "ref": "A",
      "index_type": "index",
      "value": {
        "func": "count"
      },
      "unit": "none",
      "index": "java_app_logs",
      "date_field": "timestamp",
      "filter": "log_level:"ERROR"",
      "interval": 1800
    }],
   //.......
    "triggers": [{
      "mode": 0,
      "expressions": [{ //当规则A查出来的值大于0时触发告警
        "ref": "A",
        "comparisonOperator": ">",
        "value": 0,
        "logicalOperator": "&&"
      }],
      "severity": 1, //一级告警
      "recover_config": {
        "judge_type": 0
      },
      "join_ref": "A",
      "exp": "$A > 0"
    }],
    //.......
  },

最后一个部分也就是规则配置的调度算法,对应参数如下:

  1. cron_pattern指明每15s执行一次
  2. prom_for_duration:持续时间为60s也就是4次调度都触发告警与之则告警
  3. enable_days_of_weeks:执行周期为一整周即周一到周日都有
  4. enable_stimes: 开始时间
  5. enable_etimes:结束时间
 "cron_pattern": "@every 15s",
  "prom_for_duration": 60,
  "enable_days_of_weeks": [
    [
      "0",
      "1",
      "2",
      "3",
      "4",
      "5",
      "6"
    ]
  ],
  "enable_stimes": [
    "00:00"
  ],
  "enable_etimes": [
    "00:00"
  ],

明确参数后,我们通过接口映射定位到后端的执行函数,也就是alertRuleAddByFE方法,这里笔者也贴出接口对应的映射配置和实际执行函数,保存规则的逻辑比较简单,即:

  1. 解析参数并判空
  2. 调用alertRuleAdd保存规则
  3. 返回执行结果
pages.POST("/busi-group/:id/alert-rules", rt.auth(), rt.user(), rt.perm("/alert-rules/add"), rt.bgrw(), rt.alertRuleAddByFE)


func (rt *Router) alertRuleAddByFE(c *gin.Context) {
 username := c.MustGet("username").(string)

 var lst []models.AlertRule
 
 ginx.BindJSON(c, &lst)
 
 count := len(lst)
 if count == 0 {
  ginx.Bomb(http.StatusBadRequest, "input json is empty")
 }

 bgid := ginx.UrlParamInt64(c, "id") 
 
 reterr := rt.alertRuleAdd(lst, username, bgid, c.GetHeader("X-Language"))
 
 ginx.NewRender(c).Data(reterr, nil)
}

宏观的了解整个流程之后,我们还是需要深入细节了解alertRuleAdd实现细节,其实这段逻辑和我们java开发日常的crud接口都差不多,本质上就是将参数绑定到go语言的dto上保存到数据库(默认为sqllite),对应的映射转换细节如下:

  1. EnableStime和EnableEtime作为任务起止时间直接平迁
  2. 执行周期转为空格分隔
  3. 核心的告警规则查询条件和调度时间配置json即rule config直接序列化为JSON写入

几个核心转换过程笔者也已图解的方式展示了一下,读者结合说明了解一下:

对应笔者也给出router_alert_rule.goalertRuleAdd的实现,可以看到如下步骤:

  1. 这个函数内部设置了alertRule对象的CreateByUpdateBy
  2. 调用FE2DB生执行前端参数转为上图所示的数据结构
  3. 然后AlertRule调用Add写入db中,很明显这种设计让AlertRule具备持久化的能力,是一种具备充血模型的设计理念
func (rt *Router) alertRuleAdd(lst []models.AlertRule, username string, bgid int64, lang string) map[string]string {
 count := len(lst) 
 
 reterr := make(map[string]string)
&nbsp;for&nbsp;i&nbsp;:=&nbsp;0;&nbsp;i&nbsp;<&nbsp;count;&nbsp;i++&nbsp;{
&nbsp;&nbsp;lst[i].Id&nbsp;=&nbsp;0&nbsp;
&nbsp;&nbsp;lst[i].GroupId&nbsp;=&nbsp;bgid
&nbsp;&nbsp;if&nbsp;username&nbsp;!=&nbsp;""&nbsp;{
&nbsp;&nbsp;&nbsp;lst[i].CreateBy&nbsp;=&nbsp;username&nbsp;
&nbsp;&nbsp;&nbsp;lst[i].UpdateBy&nbsp;=&nbsp;username&nbsp;
&nbsp;&nbsp;}
&nbsp;&nbsp;
&nbsp;&nbsp;if&nbsp;err&nbsp;:=&nbsp;lst[i].FE2DB();&nbsp;err&nbsp;!=&nbsp;nil&nbsp;{
&nbsp;&nbsp;&nbsp;reterr[lst[i].Name]&nbsp;=&nbsp;i18n.Sprintf(lang,&nbsp;err.Error())
&nbsp;&nbsp;&nbsp;continue
&nbsp;&nbsp;}
&nbsp;&nbsp;
&nbsp;&nbsp;if&nbsp;err&nbsp;:=&nbsp;lst[i].Add(rt.Ctx);&nbsp;err&nbsp;!=&nbsp;nil&nbsp;{
&nbsp;&nbsp;&nbsp;reterr[lst[i].Name]&nbsp;=&nbsp;i18n.Sprintf(lang,&nbsp;err.Error())
&nbsp;&nbsp;}&nbsp;else&nbsp;{
&nbsp;&nbsp;&nbsp;reterr[lst[i].Name]&nbsp;=&nbsp;""
&nbsp;&nbsp;}
&nbsp;}
&nbsp;return&nbsp;reterr
}

结合上述的整体说明笔者也给出FE2DB的函数实现细节,大体就是上图所示的映射转换,读者可结合注释回顾一下:

func&nbsp;(ar&nbsp;*AlertRule)&nbsp;FE2DB()&nbsp;error&nbsp;{
&nbsp;//如果起止时间存在则将起止时间设置到调用的ar上,也就是我们配置的enable_stimes和enable_etimes
&nbsp;if&nbsp;len(ar.EnableStimesJSON)&nbsp;>&nbsp;0&nbsp;{
&nbsp;&nbsp;ar.EnableStime&nbsp;=&nbsp;strings.Join(ar.EnableStimesJSON,&nbsp;"&nbsp;")
&nbsp;&nbsp;ar.EnableEtime&nbsp;=&nbsp;strings.Join(ar.EnableEtimesJSON,&nbsp;"&nbsp;")
&nbsp;}&nbsp;else&nbsp;{
&nbsp;&nbsp;ar.EnableStime&nbsp;=&nbsp;ar.EnableStimeJSON
&nbsp;&nbsp;ar.EnableEtime&nbsp;=&nbsp;ar.EnableEtimeJSON
&nbsp;}
&nbsp;//按照空格设置启用的星期,按照空格进行拼接,对应参数为&nbsp;"enable_days_of_weeks":&nbsp;[
&nbsp;//&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;[
&nbsp;//&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;"0",
&nbsp;//&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;"1",
&nbsp;//&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;"2",
&nbsp;//&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;"3",
&nbsp;//&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;"4",
&nbsp;//&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;"5",
&nbsp;//&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;"6"
&nbsp;//&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;]
&nbsp;//&nbsp;&nbsp;&nbsp;&nbsp;],
&nbsp;if&nbsp;len(ar.EnableDaysOfWeeksJSON)&nbsp;>&nbsp;0&nbsp;{
&nbsp;&nbsp;for&nbsp;i&nbsp;:=&nbsp;0;&nbsp;i&nbsp;<&nbsp;len(ar.EnableDaysOfWeeksJSON);&nbsp;i++&nbsp;{
&nbsp;&nbsp;&nbsp;if&nbsp;len(ar.EnableDaysOfWeeksJSON)&nbsp;==&nbsp;1&nbsp;{
&nbsp;&nbsp;&nbsp;&nbsp;ar.EnableDaysOfWeek&nbsp;=&nbsp;strings.Join(ar.EnableDaysOfWeeksJSON[i],&nbsp;"&nbsp;")
&nbsp;&nbsp;&nbsp;}&nbsp;else&nbsp;{
&nbsp;&nbsp;&nbsp;&nbsp;if&nbsp;i&nbsp;==&nbsp;len(ar.EnableDaysOfWeeksJSON)-1&nbsp;{
&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;ar.EnableDaysOfWeek&nbsp;+=&nbsp;strings.Join(ar.EnableDaysOfWeeksJSON[i],&nbsp;"&nbsp;")
&nbsp;&nbsp;&nbsp;&nbsp;}&nbsp;else&nbsp;{
&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;ar.EnableDaysOfWeek&nbsp;+=&nbsp;strings.Join(ar.EnableDaysOfWeeksJSON[i],&nbsp;"&nbsp;")&nbsp;+&nbsp;";"
&nbsp;&nbsp;&nbsp;&nbsp;}
&nbsp;&nbsp;&nbsp;}
&nbsp;&nbsp;}
&nbsp;}&nbsp;else&nbsp;{
&nbsp;&nbsp;ar.EnableDaysOfWeek&nbsp;=&nbsp;strings.Join(ar.EnableDaysOfWeekJSON,&nbsp;"&nbsp;")
&nbsp;}

&nbsp;//......

&nbsp;

&nbsp;//将rule_config转为json串绑定到RuleConfig上
&nbsp;if&nbsp;ar.RuleConfigJson&nbsp;!=&nbsp;nil&nbsp;{
&nbsp;&nbsp;b,&nbsp;err&nbsp;:=&nbsp;json.Marshal(ar.RuleConfigJson)
&nbsp;&nbsp;if&nbsp;err&nbsp;!=&nbsp;nil&nbsp;{
&nbsp;&nbsp;&nbsp;return&nbsp;fmt.Errorf("marshal&nbsp;rule_config&nbsp;err:%v",&nbsp;err)
&nbsp;&nbsp;}
&nbsp;&nbsp;//绑定rule规则&nbsp;"rule_config":&nbsp;{
&nbsp;&nbsp;//&nbsp;&nbsp;&nbsp;&nbsp;"queries":&nbsp;[{
&nbsp;&nbsp;//&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;"prom_ql":&nbsp;"",
&nbsp;&nbsp;//&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;"severity":&nbsp;2,
&nbsp;&nbsp;//&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;"ref":&nbsp;"A",
&nbsp;&nbsp;//&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;"index_type":&nbsp;"index",
&nbsp;&nbsp;//&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;"value":&nbsp;{
&nbsp;&nbsp;//&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;"func":&nbsp;"count"
&nbsp;&nbsp;//&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;},
&nbsp;&nbsp;//&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;"unit":&nbsp;"none",
&nbsp;&nbsp;//&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;"index":&nbsp;"java_app_logs",
&nbsp;&nbsp;//&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;"date_field":&nbsp;"timestamp",
&nbsp;&nbsp;//&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;"filter":&nbsp;"log_level:"ERROR"",
&nbsp;&nbsp;//&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;"interval":&nbsp;1800
&nbsp;&nbsp;//&nbsp;&nbsp;&nbsp;&nbsp;}],
&nbsp;&nbsp;ar.RuleConfig&nbsp;=&nbsp;string(b)
&nbsp;&nbsp;ar.PromQl&nbsp;=&nbsp;""
&nbsp;}

&nbsp;//......

&nbsp;return&nbsp;nil
}

基于FE2DB生成数据映射对象后,就需要进行持久化,按照笔者的说法该模型会调用内置的Add将结构体持久化,对应的方法也位于alert_rule.go文件的Add

func&nbsp;(ar&nbsp;*AlertRule)&nbsp;Add(ctx&nbsp;*ctx.Context)&nbsp;error&nbsp;{
&nbsp;
&nbsp;
&nbsp;
&nbsp;now&nbsp;:=&nbsp;time.Now().Unix()
&nbsp;ar.CreateAt&nbsp;=&nbsp;now
&nbsp;ar.UpdateAt&nbsp;=&nbsp;now
&nbsp;
&nbsp;return&nbsp;Insert(ctx,&nbsp;ar)
}

最终我们也可以在alert_rule看到这份信息:

轮询监控告警工作过程

完成的查询的任务创建之后,就到了Nightingale中最重要的一环,即基于规则进行告警监控,这个过程本质就是上述两个步骤的综合配置结果,即通过规则配置和插入,结合用户调测配置的查询规则进行周期性轮询并,针对查到的数据进行阈值判断,一旦发现结果超出阈值,则将结果写入缓存队列中,后续各种告警手段都会基于这个缓存进行告警输出。

这里笔者也给出Nightingale告警的宏观流程:

  1. 基于之前写入数据库且缓存的rule对象生成job提交到调取器scheduler
  2. scheduler定期执行这个job
  3. jobelasticsearch发起请求获取结果
  4. 将触发阈值的结果封装成DataResp写入缓存seriesStore
  5. 遍历seriesStore将其封装成事件写入一个协程安全的队列eventQueue
  6. 后续Nightingale就会基于这个队列缓存进行数据库持久化或者告警操作

对应的我们先给出从缓存中获取rule并将其提交到调度器scheduler的代码,即位于alert_rule.go下的syncAlertRules函数,逻辑比较简单,从alertRuleCache拉取到规则后调用NewAlertRuleWorker提交到调度器即可:

func&nbsp;(s&nbsp;*Scheduler)&nbsp;syncAlertRules()&nbsp;{
&nbsp;
&nbsp;ids&nbsp;:=&nbsp;s.alertRuleCache.GetRuleIds()&nbsp;
&nbsp;
&nbsp;
&nbsp;for&nbsp;_,&nbsp;id&nbsp;:=&nbsp;range&nbsp;ids&nbsp;{&nbsp;
&nbsp;&nbsp;rule&nbsp;:=&nbsp;s.alertRuleCache.Get(id)
&nbsp;&nbsp;

&nbsp;&nbsp;ruleType&nbsp;:=&nbsp;rule.GetRuleType()
&nbsp;&nbsp;if&nbsp;rule.IsPrometheusRule()&nbsp;||&nbsp;rule.IsInnerRule()&nbsp;{
&nbsp;&nbsp;&nbsp;&nbsp;
&nbsp;&nbsp;&nbsp;for&nbsp;_,&nbsp;dsId&nbsp;:=&nbsp;range&nbsp;datasourceIds&nbsp;{
&nbsp;&nbsp;&nbsp;&nbsp;
&nbsp;&nbsp;&nbsp;&nbsp;
&nbsp;&nbsp;&nbsp;&nbsp;alertRule&nbsp;:=&nbsp;NewAlertRuleWorker(rule,&nbsp;dsId,&nbsp;processor,&nbsp;s.promClients,&nbsp;s.ctx)&nbsp;
&nbsp;&nbsp;&nbsp;&nbsp;alertRuleWorkers[alertRule.Hash()]&nbsp;=&nbsp;alertRule
&nbsp;&nbsp;&nbsp;}
&nbsp;&nbsp;}&nbsp;else&nbsp;if&nbsp;rule.IsHostRule()&nbsp;{
&nbsp;&nbsp;&nbsp;
&nbsp;&nbsp;}&nbsp;else&nbsp;{
&nbsp;&nbsp;&nbsp;
&nbsp;&nbsp;}
&nbsp;}

后续就会有一个协程定时调用eval.go的Eval方法,通过GetAnomalyPoint发起es查询,如果存在异常端点事件,则将其通过Processor.Handle封装成event缓存起来,等待Nightingale底层各种协程进行持久化、告警操作,对应笔者给出这几个步骤的核心代码断,读者可以结合注释了解一下:

func&nbsp;(arw&nbsp;*AlertRuleWorker)&nbsp;Eval()&nbsp;{
&nbsp;
&nbsp;
&nbsp;switch&nbsp;typ&nbsp;{
&nbsp;case&nbsp;models.PROMETHEUS:
&nbsp;
&nbsp;default:
&nbsp;&nbsp;
&nbsp;&nbsp;anomalyPoints,&nbsp;recoverPoints,&nbsp;err&nbsp;=&nbsp;arw.GetAnomalyPoint(cachedRule,&nbsp;arw.Processor.DatasourceId())
&nbsp;}

&nbsp;

&nbsp;

&nbsp;
&nbsp;}&nbsp;else&nbsp;{
&nbsp;
&nbsp;}
&nbsp;
&nbsp;arw.Processor.Handle(anomalyPoints,&nbsp;"inner",&nbsp;arw.Inhibit)
}

对应的笔者也给出GetAnomalyPoint的实现细节,逻辑比较简单,整体就是触发es查询即QueryData(上文告警规则调测介绍过,其底层es查询的逻辑),然后将其写入seriesStore中,然后结合rule对象通过parser.CalcWithRid查看结果是否触发阈值,如果触发则存入异常端点的切片points中返回:

func&nbsp;(arw&nbsp;*AlertRuleWorker)&nbsp;GetAnomalyPoint(rule&nbsp;*models.AlertRule,&nbsp;dsId&nbsp;int64)&nbsp;([]models.AnomalyPoint,&nbsp;[]models.AnomalyPoint,&nbsp;error)&nbsp;{
&nbsp;

&nbsp;&nbsp;&nbsp;
&nbsp;&nbsp;&nbsp;series,&nbsp;err&nbsp;:=&nbsp;plug.QueryData(ctx,&nbsp;query)
&nbsp;&nbsp;&nbsp;
&nbsp;&nbsp;

&nbsp;&nbsp;
&nbsp;&nbsp;&nbsp;for&nbsp;i&nbsp;:=&nbsp;0;&nbsp;i&nbsp;<&nbsp;len(series);&nbsp;i++&nbsp;{
&nbsp;&nbsp;&nbsp;&nbsp;
&nbsp;&nbsp;&nbsp;&nbsp;serieHash&nbsp;:=&nbsp;hash.GetHash(series[i].Metric,&nbsp;series[i].Ref)
&nbsp;&nbsp;&nbsp;&nbsp;
&nbsp;&nbsp;&nbsp;&nbsp;tagHash&nbsp;:=&nbsp;hash.GetTagHash(series[i].Metric)
&nbsp;&nbsp;&nbsp;&nbsp;
&nbsp;&nbsp;&nbsp;&nbsp;seriesStore[serieHash]&nbsp;=&nbsp;series[i]

&nbsp;&nbsp;&nbsp;&nbsp;
&nbsp;&nbsp;&nbsp;&nbsp;
&nbsp;&nbsp;&nbsp;&nbsp;seriesTagIndex[tagHash]&nbsp;=&nbsp;append(seriesTagIndex[tagHash],&nbsp;serieHash)&nbsp;
&nbsp;&nbsp;&nbsp;}
&nbsp;&nbsp;&nbsp;
&nbsp;&nbsp;}

&nbsp;&nbsp;

&nbsp;&nbsp;if&nbsp;!ruleQuery.ExpTriggerDisable&nbsp;{
&nbsp;&nbsp;&nbsp;for&nbsp;_,&nbsp;trigger&nbsp;:=&nbsp;range&nbsp;ruleQuery.Triggers&nbsp;{
&nbsp;&nbsp;&nbsp;&nbsp;
&nbsp;&nbsp;&nbsp;&nbsp;
&nbsp;&nbsp;&nbsp;&nbsp;for&nbsp;_,&nbsp;seriesHash&nbsp;:=&nbsp;range&nbsp;seriesTagIndex&nbsp;{
&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;
&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;
&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;
&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;isTriggered&nbsp;:=&nbsp;parser.CalcWithRid(trigger.Exp,&nbsp;m,&nbsp;rule.Id)
&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;
&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;
&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;point&nbsp;:=&nbsp;models.AnomalyPoint{
&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;Key:&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;sample.MetricName(),
&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;Labels:&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;sample.Metric,
&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;Timestamp:&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;int64(ts),
&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;Value:&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;value,
&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;Values:&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;values,
&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;Severity:&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;trigger.Severity,
&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;Triggered:&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;isTriggered,
&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;Query:&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;fmt.Sprintf("query:%+v&nbsp;trigger:%+v",&nbsp;queries,&nbsp;trigger),
&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;RecoverConfig:&nbsp;trigger.RecoverConfig,
&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;ValuesUnit:&nbsp;&nbsp;&nbsp;&nbsp;valuesUnitMap,
&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;}
&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;
&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;if&nbsp;isTriggered&nbsp;{
&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;points&nbsp;=&nbsp;append(points,&nbsp;point)
&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;}&nbsp;else&nbsp;{
&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;
&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;}
&nbsp;&nbsp;&nbsp;&nbsp;}
&nbsp;&nbsp;&nbsp;}
&nbsp;&nbsp;}

&nbsp;&nbsp;
&nbsp;
&nbsp;return&nbsp;points,&nbsp;recoverPoints,&nbsp;nil
}

最后就是事件告警了,逻辑比较简单,基于上一步的异常端点anomalyPoints遍历生成event写入缓存中:

func&nbsp;(p&nbsp;*Processor)&nbsp;Handle(anomalyPoints&nbsp;[]models.AnomalyPoint,&nbsp;from&nbsp;string,&nbsp;inhibit&nbsp;bool)&nbsp;{
&nbsp;&nbsp;

&nbsp;
&nbsp;eventsMap&nbsp;:=&nbsp;make(map[string][]*models.AlertCurEvent)
&nbsp;for&nbsp;_,&nbsp;anomalyPoint&nbsp;:=&nbsp;range&nbsp;anomalyPoints&nbsp;{
&nbsp;&nbsp;
&nbsp;&nbsp;event&nbsp;:=&nbsp;p.BuildEvent(anomalyPoint,&nbsp;from,&nbsp;now,&nbsp;ruleHash)&nbsp;
&nbsp;&nbsp;

&nbsp;&nbsp;

&nbsp;&nbsp;
&nbsp;&nbsp;}

&nbsp;&nbsp;
&nbsp;&nbsp;
&nbsp;&nbsp;tagHash&nbsp;:=&nbsp;TagHash(anomalyPoint)
&nbsp;&nbsp;
&nbsp;&nbsp;eventsMap[tagHash]&nbsp;=&nbsp;append(eventsMap[tagHash],&nbsp;event)
&nbsp;}
&nbsp;
&nbsp;for&nbsp;_,&nbsp;events&nbsp;:=&nbsp;range&nbsp;eventsMap&nbsp;{
&nbsp;&nbsp;p.handleEvent(events)
&nbsp;}

&nbsp;&nbsp;
}

小结

本文详细的介绍了Nightingale的日志告警规则配置和实现细节,基于这个契机,笔者也来谈谈读者常问的一个问题——如何较好的去掌握一门技术,作为一个计算机从业者,私以为学习一门技术的本质无非是想尽一切去了解它,让自己拥有白盒的视角去看待这些原本黑盒的技术。

以本次Nightingale这个告警框架为例,读者可以非常直观的看到笔者的学习过程,本质上就是:

  1. 结合一手官网的文档去学习和应用
  2. 通过这些应用找到请求入口
  3. 结合入口定位到源码入口并针对每个接口的实现细节进行阅读和具象化梳理

最终读者眼中的这些工具,在笔者眼里就变为一个http请求在go应用框架的协程中的各种数据流扭转和es交互请求和响应,后续无论是运用还是问题排查也都是以这种视角游刃有余的去使用和排查。