✍ 道路千万条,安全第一条。操作不规范,运维两行泪。

前言

目前,大部分公司都采用Prometheus + Grafana这一套来做指标监控,所以在Prometheus中也有大量的指标数据。为了满足日常工作中的巡检,可以基于Prometheus实现自动巡检,减轻部分运维压力。

思路

为了灵活管理巡检任务,将整个巡检功能进行了拆分管理,分为:

  • 数据源管理:可以管理多个Prometheus数据源,后期也可以增加其他数据源,比如ES等。

  • 巡检项管理:目前的巡检项就是各种Prometheus规则,之所以要单独进行管理,是为了在多数据源、多集群等情况下进行复用。

  • 标签管理:目前是Prometheuslabel,也是为了方便复用巡检项巡检项标签可以灵活进行组合。

  • 任务编排:编排各种巡检任务。

  • 执行作业:配置定时的巡检作业,它由多个编排的任务组成。

  • 巡检报告:便于查看、导出巡检结果。

  • 巡检通知:巡检结果可以通知到企业微信群,便于业务方快速知道目前整个系统有没有问题。

效果

数据源管理

(1)添加数据源

(2)数据源列表

巡检项管理

(1)添加巡检项

(2)巡检项列表

标签管理

(1)添加标签

(2)标签列表

任务编排

(1)创建任务编排

(2)任务列表

执行作业

(1)创建执行作业

(2)作业列表

巡检报告

每次巡检完成都会生成对应的巡检报告。

点击详情可以看到巡检的具体结果。

点击导出,即可将报告导出为PDF。

如果配置了巡检通知,则会将对应的巡检结果发送到企业微信群。

代码实现

大部分的代码都是普通的CRUD,比如数据源的管理、巡检项的管理都是基础的CRUD,没有什么好说的。

这里简单说一下具体巡检的实现。

(1)当用户创建了执行作业且该作业处于开启状态,就会创建一个定时任务。

// CreateCronTask 创建定时任务

func (inspectionExecutionJobService *InspectionExecutionJobService) CreateCronTask(job *AutoInspection.InspectionExecutionJob) error {
    cronName := fmt.Sprintf("InspectionExecution_%d", job.ID)
    taskName := fmt.Sprintf("InspectionExecution_%d", job.ID)
    // 检查是否已存在相同的定时任务
    if _, found := global.GVA_Timer.FindTask(cronName, taskName); found {
        // 如果已存在,先清除旧的定时任务
        global.GVA_Timer.Clear(cronName)
    }
    // 创建定时任务
    var option []cron.Option
    option = append(option, cron.WithSeconds())
    // 添加定时任务
    if _, err := global.GVA_Timer.AddTaskByFunc(cronName, job.CronExpr, func() {
        // 执行巡检任务
        inspectionExecutionJobService.ExecuteInspectionJob(job)
    }, taskName, option...); err != nil {
        global.GVA_LOG.Error("创建定时任务失败", zap.Error(err), zap.Uint("jobID", job.ID))
        return err
    }
    // 更新下次执行时间
    nextTime := inspectionExecutionJobService.calculateNextRunTime(job.CronExpr)
    job.NextRunTime = &nextTime
    // 更新数据库中的记录
    return global.GVA_DB.Model(job).Updates(map[string]interface{}{
        "next_run_time": job.NextRunTime,
    }).Error
}

Tips:因为是采用的gin-vue-admin框架,所以直接使用框架自带的timer定时器。

(2)当执行时间到了,就会执行ExecuteInspectionJob巡检任务。

func (inspectionExecutionJobService *InspectionExecutionJobService) ExecuteInspectionJob(job *AutoInspection.InspectionExecutionJob) {
    // 更新作业执行时间
    inspectionExecutionJobService.updateJobExecutionTime(job)
    // 创建执行记录
    jobExecution := inspectionExecutionJobService.createJobExecution(job)
    if jobExecution == nil {
        return
    }
    // 执行所有关联的巡检任务并收集结果
    allResults := inspectionExecutionJobService.executeAllInspectionTasks(job, jobExecution)
    global.GVA_LOG.Info("执行完成", zap.Any("results", allResults))
    // 更新执行记录状态和结果
    inspectionExecutionJobService.updateJobExecutionResult(jobExecution, allResults)
    // 发送通知
    if *job.IsNotice {
        inspectionExecutionJobService.sendInspectionNotification(job, jobExecution, allResults)
    }
}

这里主要是executeAllInspectionTasks来执行巡检任务。

// executeAllInspectionTasks 执行所有关联的巡检任务并收集结果

func (inspectionExecutionJobService *InspectionExecutionJobService) executeAllInspectionTasks(job *AutoInspection.InspectionExecutionJob, jobExecution *AutoInspection.JobExecution) []*result.ProductsResult {
    // 创建一个等待组来同步所有巡检任务
    var wg sync.WaitGroup
    // 创建一个互斥锁来保护结果集
    var mu sync.Mutex
    // 创建一个结果集合
    allResults := make([]*result.ProductsResult, 0)
    // 执行所有关联的巡检任务
    for _, jobID := range job.JobIds {
        wg.Add(1)
        go func(id uint) {
            defer wg.Done()
            // 执行单个巡检任务并获取结果
            result := inspectionExecutionJobService.executeSingleInspectionTask(id, jobExecution)
            if result != nil {
                // 将结果添加到总结果集中
                mu.Lock()
                allResults = append(allResults, result)
                mu.Unlock()
            }
        }(jobID)
    }
    // 等待所有巡检任务完成
    wg.Wait()
    return allResults
}

它会把作业中的任务拆成单个任务,然后由executeSingleInspectionTask分别执行并收集执行结果。

// executeSingleInspectionTask 执行单个巡检任务
func (inspectionExecutionJobService *InspectionExecutionJobService) executeSingleInspectionTask(jobID uint, jobExecution *AutoInspection.JobExecution) *result.ProductsResult {
    global.GVA_LOG.Info("执行巡检任务", zap.Uint("jobID", jobID))
    // 获取巡检任务信息
    inspectionJob, _ := inspectionJobService.GetInspectionJob(fmt.Sprintf("%d", jobID))
    // 创建结果通道
    resultCh := make(chan *result.ProductsResult)
    // 创建一个用于等待结果的WaitGroup
    var resultWg sync.WaitGroup
    resultWg.Add(1)
    // 用于存储结果的变量
    var taskResult *result.ProductsResult
    // 启动一个goroutine来接收结果
    go func() {
        defer resultWg.Done()
        result := <-resultCh
        global.GVA_LOG.Info("巡检任务执行完成",
            zap.String("jobName", inspectionJob.Name),
            zap.Any("result", result))
        // 保存结果
        taskResult = result
    }()
    // 执行巡检任务
    inspectionExecutionJobService.ExecuteInspectionTask(&inspectionJob, jobExecution, resultCh)
    // 等待结果接收完成
    resultWg.Wait()
    return taskResult

}

ExecuteInspectionTask中是为了方便扩展数据源。

func (inspectionExecutionJobService *InspectionExecutionJobService) ExecuteInspectionTask(inspectionJob *AutoInspection.InspectionJob, jobExecution *AutoInspection.JobExecution, resultCh chan *result.ProductsResult) {

    switch inspectionJob.DataSourceType {
    case "prometheus":
        // 执行Prometheus巡检任务
        inspectionExecutionJobService.ExecutePrometheusInspectionTask(inspectionJob, jobExecution, resultCh)
    }
}

由于目前只有Prometheus数据源,所以将直接执行ExecutePrometheusInspectionTask。在这个方法中主要是构造Prometheus规则然后进行巡检。

// ExecutePrometheusInspectionTask 执行Prometheus巡检任务
func (inspectionExecutionJobService *InspectionExecutionJobService) ExecutePrometheusInspectionTask(inspectionJob *AutoInspection.InspectionJob, jobExecution *AutoInspection.JobExecution, resultCh chan *result.ProductsResult) {
 // 执行Prometheus巡检任务的逻辑
 var inspectionItemsService InspectionItemsService
 var inspectionTagService InspectionTagService
 var dataSourceService DataSourceService

 // 获取数据源信息
 dataSource, _ := dataSourceService.GetDataSource(fmt.Sprintf("%d", inspectionJob.DataSourceId))

 // 创建规则列表
 prometheusRules := make([]*product.PrometheusRule, 0, len(inspectionJob.ItemLabelMaps))

 // 遍历巡检项与标签映射关系
 for _, itemLabelMap := range inspectionJob.ItemLabelMaps {
  // 获取巡检项信息
  inspectionItem, _ := inspectionItemsService.GetInspectionItems(fmt.Sprintf("%d", itemLabelMap.ItemId))

  // 获取标签信息
  var inspectionTag AutoInspection.InspectionTag
  if itemLabelMap.LabelId != 0 {
   inspectionTag, _ = inspectionTagService.GetInspectionTag(fmt.Sprintf("%d", itemLabelMap.LabelId))
  }

  // 创建Prometheus规则
  prometheusRule := &product.PrometheusRule{
   Name:           inspectionItem.Name,
   Rule:           inspectionItem.Rule,
   LabelFilter:    inspectionTag.Label,
   Desc:           inspectionItem.Description,
   AlertInfo:      inspectionItem.OutputTemplate,
   DataSourceName: dataSource.Name,
  }

  // 添加到规则列表
  prometheusRules = append(prometheusRules, prometheusRule)

 }

 // 创建规则集合
 rules := product.Rules{
  Prometheus: prometheusRules,
  AliyunSafe: []*product.AliyunSafeRule{}, // 空列表,因为这里只处理Prometheus规则
 }

 // 创建产品
 prod := &product.Product{
  Name:  inspectionJob.Name,
  Rules: rules,
 }

 // 使用defer和recover捕获可能的panic
 defer func() {
  if r := recover(); r != nil {
   // 记录panic信息
   global.GVA_LOG.Error("执行巡检任务发生panic",
    zap.Any("panic", r),
    zap.String("jobName", inspectionJob.Name))

   // 创建一个表示失败的结果并发送到结果通道
   pr := &result.ProductsResult{ProductName: inspectionJob.Name}

   // 为每个规则创建失败结果
   for _, rule := range prometheusRules {
    errorMsg := fmt.Sprintf("巡检执行失败: %v", r)
    failureResult := result.NewRuleResult(
     result.WithInspectionInfo(rule.Name),
     result.WithInspectionResult(result.ABNORMAL),
     result.WithInspectionErrorInfo(
      []map[string]string{{
       "error": errorMsg,
       "rule":  rule.Rule,
      }},
      "执行规则 {{rule}} 时发生错误: {{error}}",
     ),
    )
    pr.Add(failureResult)
   }

   // 发送结果
   resultCh <- pr
  }
 }()

 // 执行巡检
 err = prod.Run(resultCh)
 if err != nil {
  global.GVA_LOG.Error("执行巡检任务失败", zap.Error(err), zap.String("jobName", inspectionJob.Name))
  return
 }

 global.GVA_LOG.Info("巡检任务已启动", zap.String("jobName", inspectionJob.Name))
}

prod.Run中,会去做真正的指标数据查询。

func (p *Product) Run(resultCh chan *result.ProductsResult) error {
 global.GVA_LOG.Info(fmt.Sprintf("开始巡检, %s", p.Name))
 pr := &result.ProductsResult{ProductName: p.Name}
 // prometheus巡检规则
 for _, prometheusRule := range p.Rules.Prometheus {
  ruleInspectRes, err := prometheusRule.Run()
  if err != nil {
   return err
  }
  pr.Add(ruleInspectRes)
 }
 resultCh <- pr
 return nil
}

然后调用prometheusRule.Run获取结果。

func (r *PrometheusRule) Run() (*result.RuleResult, error) {
 ds, err := datasource.GetByName(r.DataSourceName)
 if err != nil {
  return nil, err
 }
 pds, ok := ds.(*datasource.PrometheusDataSource)
 if !ok {
  return nil, fmt.Errorf("数据源类型错误: %s 不是Prometheus数据源", r.DataSourceName)
 }
 if pds.Client == nil {
  return nil, fmt.Errorf("数据源为空: %s", r.DataSourceName)
 }
 res, err := pds.Run(r.Rule, r.LabelFilter)
 if err != nil {
  return nil, err
 }
 ruleRes := r.buildRuleResult(res)
 return ruleRes, nil
}

func (r *PrometheusRule) buildRuleResult(resultLabels []map[string]string) *result.RuleResult {
 if len(resultLabels) == 0 {
  return result.NewRuleResult(result.WithInspectionInfo(fmt.Sprintf("%s", r.Name)),
   result.WithInspectionResult(result.NORMAL))
 }
 return result.NewRuleResult(result.WithInspectionInfo(fmt.Sprintf("%s", r.Name)),
  result.WithInspectionResult(result.ABNORMAL),
  result.WithInspectionErrorInfo(resultLabels, r.AlertInfo))
}

具体的查询是封装在pds.Run中的,它会去调用Prometheus的接口去查询数据。

func Query(client api.Client, rule string) (model.Value, []string, error) {
 // 添加空指针检查
 if client == nil {
  return nil, nil, errors.New("Prometheus client is nil")
 }
 v1Api := promV1.NewAPI(client)
 ctx, cancel := context.WithTimeout(context.Background(), 10*time.Second)
 defer cancel()
 value, warnings, err := v1Api.Query(ctx, rule, time.Now(), promV1.WithTimeout(10*time.Second))
 global.GVA_LOG.Debug("查询结果", zap.String("value", value.String()), zap.Any("warnings", warnings))
 if err != nil {
  return nil, nil, errors.WithStack(err)
 }
 return value, warnings, nil
}

(3)如果需要发送到企业微信,就会构建发送结果进行发送。

func (inspectionExecutionJobService *InspectionExecutionJobService) sendInspectionNotification(job *AutoInspection.InspectionExecutionJob, jobExecution *AutoInspection.JobExecution, results []*result.ProductsResult) {
 // 获取通知配置
 var notifyService = NotifyService{}
 notify, err := notifyService.GetNotify(fmt.Sprintf("%d", job.NoticdId))
 if err != nil {
  global.GVA_LOG.Error("获取通知配置失败", zap.Error(err))
  return
 }

 // 构建通知内容
 // 1. 巡检摘要
 taskCount := len(results)   // 巡检任务数量
 itemCount := 0              // 巡检项数量
 normalCount := 0            // 正常项数量
 abnormalCount := 0          // 异常项数量
 abnormalItems := []string{} // 异常项列表

 // 统计巡检项、正常项和异常项的数量
 for _, task := range results {
  itemCount += len(task.SubRuleResults)
  for _, item := range task.SubRuleResults {
   if item.InspectionResult == result.NORMAL {
    normalCount++
   } else if item.InspectionResult == result.ABNORMAL {
    abnormalCount++
    // 收集异常项信息
    abnormalDetail := fmt.Sprintf("【%s】%s", task.ProductName, item.InspectionInfo)
    if len(item.InspectionErrorInfo) > 0 {
     abnormalDetail += "\n" + strings.Join(item.InspectionErrorInfo, "\n")
    }
    abnormalItems = append(abnormalItems, abnormalDetail)
   }
  }
 }

 // 格式化摘要信息
 summary := fmt.Sprintf("巡检任务%d个,巡检项%d个,正常%d个,异常%d个", taskCount, itemCount, normalCount, abnormalCount)

 // 构建企业微信通知内容
 var content string
 if notify.TemplateType == "markdown" {
  // Markdown格式
  content = fmt.Sprintf(`{
   "msgtype": "markdown",
   "markdown": {
    "content": "# 自动化巡检结果通知\n\n> ### 执行作业:%s\n> ### 执行时间:%s\n> ### 执行结果:%s\n\n### **异常项列表:**\n%s"
   }
  }`,
   jobExecution.ExecutionJobName,
   jobExecution.EndTime.Format("2006-01-02 15:04:05"),
   summary,
   formatAbnormalItems(abnormalItems))
 } else {
  // 文本格式
  content = fmt.Sprintf(`{
   "msgtype": "text",
   "text": {
    "content": "巡检结果通知\n执行作业:%s\n执行时间:%s\n执行结果:%s\n\n异常项列表:\n%s"
   }
  }`,
   jobExecution.ExecutionJobName,
   jobExecution.EndTime.Format("2006-01-02 15:04:05"),
   summary,
   formatAbnormalItemsText(abnormalItems))
 }

 // 发送通知
 ctx := context.Background()
 sendParams := sender.SendParams{
  NoticeType: notify.Type,
  NoticeId:   fmt.Sprintf("%d", notify.ID),
  NoticeName: notify.Name,
  Hook:       notify.Address,
  Content:    content,
 }

 err = sender.Sender(&ctx, sendParams)
 if err != nil {
  global.GVA_LOG.Error("发送巡检通知失败", zap.Error(err))
  return
 }

 global.GVA_LOG.Info("发送巡检通知成功",
  zap.String("jobName", jobExecution.ExecutionJobName),
  zap.String("summary", summary))
}

(4)PDF导出是用wkhtmltopdf实现,该包依赖服务器上的wkhtmltopdf命令。

func (jobExecutionService *JobExecutionService) GeneratePDF(jobExecution *AutoInspection.JobExecution) (string, error) {

 pdf, err := wkhtmltopdf.NewPDFGenerator()
 if err != nil {
  global.GVA_LOG.Error("PDF生成器初始化失败", zap.Error(err))
  return "", err
 }

 // 设置全局选项
 pdf.Dpi.Set(300)
 pdf.Orientation.Set(wkhtmltopdf.OrientationPortrait)
 pdf.PageSize.Set(wkhtmltopdf.PageSizeA4)
 pdf.MarginTop.Set(20)
 pdf.MarginBottom.Set(20)
 pdf.MarginLeft.Set(20)
 pdf.MarginRight.Set(20)

 // 渲染HTML模板
 htmlContent, err := jobExecutionService.renderHTMLTemplate(jobExecution)
 if err != nil {
  global.GVA_LOG.Error("HTML模板渲染失败", zap.Error(err))
  return "", err
 }

 // 创建一个页面并添加到生成器
 page := wkhtmltopdf.NewPageReader(bytes.NewBufferString(htmlContent))
 pdf.AddPage(page)

 // 生成PDF
 err = pdf.Create()
 if err != nil {
  return "", err
 }

 basePath := "uploads/pdf"
 // 创建目录(如果不存在)
 if err = os.MkdirAll(basePath, 0755); err != nil {
  global.GVA_LOG.Error("创建PDF保存目录失败", zap.Error(err))
  return "", err
 }

 filename := generatePDFFileName(jobExecution)
 filePath := filepath.Join(basePath, filename)

 // 3. 保存PDF到文件
 if err = os.WriteFile(filePath, pdf.Bytes(), 0644); err != nil {
  global.GVA_LOG.Error("保存PDF文件失败", zap.Error(err))
  return "", err
 }

    ....
    
 return downloadURL, nil
}

以上就是实现巡检的主要代码。

最后

大部分企业虽然都有监控告警,但是自动化巡检在日常的运维工作中还是必要的,它可以聚合目前系统、集群存在的问题,避免遗漏告警信息。另外,在AI发展迅猛的今天,可以把AI也结合到自动化巡检中,比如在巡检中增加一些AI预测,AI故障诊断、AI根因分析等功能。