
基于Prometheus的自动化巡检平台
✍ 道路千万条,安全第一条。操作不规范,运维两行泪。
前言
目前,大部分公司都采用Prometheus + Grafana
这一套来做指标监控,所以在Prometheus
中也有大量的指标数据。为了满足日常工作中的巡检,可以基于Prometheus
实现自动巡检,减轻部分运维压力。
思路
为了灵活管理巡检任务,将整个巡检功能进行了拆分管理,分为:
数据源管理
:可以管理多个Prometheus
数据源,后期也可以增加其他数据源,比如ES
等。巡检项管理
:目前的巡检项
就是各种Prometheus规则
,之所以要单独进行管理,是为了在多数据源、多集群等情况下进行复用。标签管理
:目前是Prometheus
的label
,也是为了方便复用巡检项
,巡检项
和标签
可以灵活进行组合。任务编排
:编排各种巡检任务。执行作业
:配置定时的巡检作业,它由多个编排的任务组成。巡检报告
:便于查看、导出巡检结果。巡检通知
:巡检结果可以通知到企业微信
群,便于业务方快速知道目前整个系统有没有问题。
效果
数据源管理
(1)添加数据源
(2)数据源列表
巡检项管理
(1)添加巡检项
(2)巡检项列表
标签管理
(1)添加标签
(2)标签列表
任务编排
(1)创建任务编排
(2)任务列表
执行作业
(1)创建执行作业
(2)作业列表
巡检报告
每次巡检完成都会生成对应的巡检报告。
点击详情可以看到巡检的具体结果。
点击导出,即可将报告导出为PDF。
如果配置了巡检通知,则会将对应的巡检结果发送到企业微信群。
代码实现
大部分的代码都是普通的CRUD
,比如数据源的管理、巡检项的管理都是基础的CRUD
,没有什么好说的。
这里简单说一下具体巡检的实现。
(1)当用户创建了执行作业
且该作业处于开启
状态,就会创建一个定时任务。
// CreateCronTask 创建定时任务
func (inspectionExecutionJobService *InspectionExecutionJobService) CreateCronTask(job *AutoInspection.InspectionExecutionJob) error {
cronName := fmt.Sprintf("InspectionExecution_%d", job.ID)
taskName := fmt.Sprintf("InspectionExecution_%d", job.ID)
// 检查是否已存在相同的定时任务
if _, found := global.GVA_Timer.FindTask(cronName, taskName); found {
// 如果已存在,先清除旧的定时任务
global.GVA_Timer.Clear(cronName)
}
// 创建定时任务
var option []cron.Option
option = append(option, cron.WithSeconds())
// 添加定时任务
if _, err := global.GVA_Timer.AddTaskByFunc(cronName, job.CronExpr, func() {
// 执行巡检任务
inspectionExecutionJobService.ExecuteInspectionJob(job)
}, taskName, option...); err != nil {
global.GVA_LOG.Error("创建定时任务失败", zap.Error(err), zap.Uint("jobID", job.ID))
return err
}
// 更新下次执行时间
nextTime := inspectionExecutionJobService.calculateNextRunTime(job.CronExpr)
job.NextRunTime = &nextTime
// 更新数据库中的记录
return global.GVA_DB.Model(job).Updates(map[string]interface{}{
"next_run_time": job.NextRunTime,
}).Error
}
Tips:因为是采用的
gin-vue-admin
框架,所以直接使用框架自带的timer定时器。
(2)当执行时间到了,就会执行ExecuteInspectionJob
巡检任务。
func (inspectionExecutionJobService *InspectionExecutionJobService) ExecuteInspectionJob(job *AutoInspection.InspectionExecutionJob) {
// 更新作业执行时间
inspectionExecutionJobService.updateJobExecutionTime(job)
// 创建执行记录
jobExecution := inspectionExecutionJobService.createJobExecution(job)
if jobExecution == nil {
return
}
// 执行所有关联的巡检任务并收集结果
allResults := inspectionExecutionJobService.executeAllInspectionTasks(job, jobExecution)
global.GVA_LOG.Info("执行完成", zap.Any("results", allResults))
// 更新执行记录状态和结果
inspectionExecutionJobService.updateJobExecutionResult(jobExecution, allResults)
// 发送通知
if *job.IsNotice {
inspectionExecutionJobService.sendInspectionNotification(job, jobExecution, allResults)
}
}
这里主要是executeAllInspectionTasks
来执行巡检任务。
// executeAllInspectionTasks 执行所有关联的巡检任务并收集结果
func (inspectionExecutionJobService *InspectionExecutionJobService) executeAllInspectionTasks(job *AutoInspection.InspectionExecutionJob, jobExecution *AutoInspection.JobExecution) []*result.ProductsResult {
// 创建一个等待组来同步所有巡检任务
var wg sync.WaitGroup
// 创建一个互斥锁来保护结果集
var mu sync.Mutex
// 创建一个结果集合
allResults := make([]*result.ProductsResult, 0)
// 执行所有关联的巡检任务
for _, jobID := range job.JobIds {
wg.Add(1)
go func(id uint) {
defer wg.Done()
// 执行单个巡检任务并获取结果
result := inspectionExecutionJobService.executeSingleInspectionTask(id, jobExecution)
if result != nil {
// 将结果添加到总结果集中
mu.Lock()
allResults = append(allResults, result)
mu.Unlock()
}
}(jobID)
}
// 等待所有巡检任务完成
wg.Wait()
return allResults
}
它会把作业
中的任务拆成单个任务,然后由executeSingleInspectionTask
分别执行并收集执行结果。
// executeSingleInspectionTask 执行单个巡检任务
func (inspectionExecutionJobService *InspectionExecutionJobService) executeSingleInspectionTask(jobID uint, jobExecution *AutoInspection.JobExecution) *result.ProductsResult {
global.GVA_LOG.Info("执行巡检任务", zap.Uint("jobID", jobID))
// 获取巡检任务信息
inspectionJob, _ := inspectionJobService.GetInspectionJob(fmt.Sprintf("%d", jobID))
// 创建结果通道
resultCh := make(chan *result.ProductsResult)
// 创建一个用于等待结果的WaitGroup
var resultWg sync.WaitGroup
resultWg.Add(1)
// 用于存储结果的变量
var taskResult *result.ProductsResult
// 启动一个goroutine来接收结果
go func() {
defer resultWg.Done()
result := <-resultCh
global.GVA_LOG.Info("巡检任务执行完成",
zap.String("jobName", inspectionJob.Name),
zap.Any("result", result))
// 保存结果
taskResult = result
}()
// 执行巡检任务
inspectionExecutionJobService.ExecuteInspectionTask(&inspectionJob, jobExecution, resultCh)
// 等待结果接收完成
resultWg.Wait()
return taskResult
}
在ExecuteInspectionTask
中是为了方便扩展数据源。
func (inspectionExecutionJobService *InspectionExecutionJobService) ExecuteInspectionTask(inspectionJob *AutoInspection.InspectionJob, jobExecution *AutoInspection.JobExecution, resultCh chan *result.ProductsResult) {
switch inspectionJob.DataSourceType {
case "prometheus":
// 执行Prometheus巡检任务
inspectionExecutionJobService.ExecutePrometheusInspectionTask(inspectionJob, jobExecution, resultCh)
}
}
由于目前只有Prometheus
数据源,所以将直接执行ExecutePrometheusInspectionTask
。在这个方法中主要是构造Prometheus
规则然后进行巡检。
// ExecutePrometheusInspectionTask 执行Prometheus巡检任务
func (inspectionExecutionJobService *InspectionExecutionJobService) ExecutePrometheusInspectionTask(inspectionJob *AutoInspection.InspectionJob, jobExecution *AutoInspection.JobExecution, resultCh chan *result.ProductsResult) {
// 执行Prometheus巡检任务的逻辑
var inspectionItemsService InspectionItemsService
var inspectionTagService InspectionTagService
var dataSourceService DataSourceService
// 获取数据源信息
dataSource, _ := dataSourceService.GetDataSource(fmt.Sprintf("%d", inspectionJob.DataSourceId))
// 创建规则列表
prometheusRules := make([]*product.PrometheusRule, 0, len(inspectionJob.ItemLabelMaps))
// 遍历巡检项与标签映射关系
for _, itemLabelMap := range inspectionJob.ItemLabelMaps {
// 获取巡检项信息
inspectionItem, _ := inspectionItemsService.GetInspectionItems(fmt.Sprintf("%d", itemLabelMap.ItemId))
// 获取标签信息
var inspectionTag AutoInspection.InspectionTag
if itemLabelMap.LabelId != 0 {
inspectionTag, _ = inspectionTagService.GetInspectionTag(fmt.Sprintf("%d", itemLabelMap.LabelId))
}
// 创建Prometheus规则
prometheusRule := &product.PrometheusRule{
Name: inspectionItem.Name,
Rule: inspectionItem.Rule,
LabelFilter: inspectionTag.Label,
Desc: inspectionItem.Description,
AlertInfo: inspectionItem.OutputTemplate,
DataSourceName: dataSource.Name,
}
// 添加到规则列表
prometheusRules = append(prometheusRules, prometheusRule)
}
// 创建规则集合
rules := product.Rules{
Prometheus: prometheusRules,
AliyunSafe: []*product.AliyunSafeRule{}, // 空列表,因为这里只处理Prometheus规则
}
// 创建产品
prod := &product.Product{
Name: inspectionJob.Name,
Rules: rules,
}
// 使用defer和recover捕获可能的panic
defer func() {
if r := recover(); r != nil {
// 记录panic信息
global.GVA_LOG.Error("执行巡检任务发生panic",
zap.Any("panic", r),
zap.String("jobName", inspectionJob.Name))
// 创建一个表示失败的结果并发送到结果通道
pr := &result.ProductsResult{ProductName: inspectionJob.Name}
// 为每个规则创建失败结果
for _, rule := range prometheusRules {
errorMsg := fmt.Sprintf("巡检执行失败: %v", r)
failureResult := result.NewRuleResult(
result.WithInspectionInfo(rule.Name),
result.WithInspectionResult(result.ABNORMAL),
result.WithInspectionErrorInfo(
[]map[string]string{{
"error": errorMsg,
"rule": rule.Rule,
}},
"执行规则 {{rule}} 时发生错误: {{error}}",
),
)
pr.Add(failureResult)
}
// 发送结果
resultCh <- pr
}
}()
// 执行巡检
err = prod.Run(resultCh)
if err != nil {
global.GVA_LOG.Error("执行巡检任务失败", zap.Error(err), zap.String("jobName", inspectionJob.Name))
return
}
global.GVA_LOG.Info("巡检任务已启动", zap.String("jobName", inspectionJob.Name))
}
在prod.Run
中,会去做真正的指标数据查询。
func (p *Product) Run(resultCh chan *result.ProductsResult) error {
global.GVA_LOG.Info(fmt.Sprintf("开始巡检, %s", p.Name))
pr := &result.ProductsResult{ProductName: p.Name}
// prometheus巡检规则
for _, prometheusRule := range p.Rules.Prometheus {
ruleInspectRes, err := prometheusRule.Run()
if err != nil {
return err
}
pr.Add(ruleInspectRes)
}
resultCh <- pr
return nil
}
然后调用prometheusRule.Run
获取结果。
func (r *PrometheusRule) Run() (*result.RuleResult, error) {
ds, err := datasource.GetByName(r.DataSourceName)
if err != nil {
return nil, err
}
pds, ok := ds.(*datasource.PrometheusDataSource)
if !ok {
return nil, fmt.Errorf("数据源类型错误: %s 不是Prometheus数据源", r.DataSourceName)
}
if pds.Client == nil {
return nil, fmt.Errorf("数据源为空: %s", r.DataSourceName)
}
res, err := pds.Run(r.Rule, r.LabelFilter)
if err != nil {
return nil, err
}
ruleRes := r.buildRuleResult(res)
return ruleRes, nil
}
func (r *PrometheusRule) buildRuleResult(resultLabels []map[string]string) *result.RuleResult {
if len(resultLabels) == 0 {
return result.NewRuleResult(result.WithInspectionInfo(fmt.Sprintf("%s", r.Name)),
result.WithInspectionResult(result.NORMAL))
}
return result.NewRuleResult(result.WithInspectionInfo(fmt.Sprintf("%s", r.Name)),
result.WithInspectionResult(result.ABNORMAL),
result.WithInspectionErrorInfo(resultLabels, r.AlertInfo))
}
具体的查询是封装在pds.Run
中的,它会去调用Prometheus
的接口去查询数据。
func Query(client api.Client, rule string) (model.Value, []string, error) {
// 添加空指针检查
if client == nil {
return nil, nil, errors.New("Prometheus client is nil")
}
v1Api := promV1.NewAPI(client)
ctx, cancel := context.WithTimeout(context.Background(), 10*time.Second)
defer cancel()
value, warnings, err := v1Api.Query(ctx, rule, time.Now(), promV1.WithTimeout(10*time.Second))
global.GVA_LOG.Debug("查询结果", zap.String("value", value.String()), zap.Any("warnings", warnings))
if err != nil {
return nil, nil, errors.WithStack(err)
}
return value, warnings, nil
}
(3)如果需要发送到企业微信,就会构建发送结果进行发送。
func (inspectionExecutionJobService *InspectionExecutionJobService) sendInspectionNotification(job *AutoInspection.InspectionExecutionJob, jobExecution *AutoInspection.JobExecution, results []*result.ProductsResult) {
// 获取通知配置
var notifyService = NotifyService{}
notify, err := notifyService.GetNotify(fmt.Sprintf("%d", job.NoticdId))
if err != nil {
global.GVA_LOG.Error("获取通知配置失败", zap.Error(err))
return
}
// 构建通知内容
// 1. 巡检摘要
taskCount := len(results) // 巡检任务数量
itemCount := 0 // 巡检项数量
normalCount := 0 // 正常项数量
abnormalCount := 0 // 异常项数量
abnormalItems := []string{} // 异常项列表
// 统计巡检项、正常项和异常项的数量
for _, task := range results {
itemCount += len(task.SubRuleResults)
for _, item := range task.SubRuleResults {
if item.InspectionResult == result.NORMAL {
normalCount++
} else if item.InspectionResult == result.ABNORMAL {
abnormalCount++
// 收集异常项信息
abnormalDetail := fmt.Sprintf("【%s】%s", task.ProductName, item.InspectionInfo)
if len(item.InspectionErrorInfo) > 0 {
abnormalDetail += "\n" + strings.Join(item.InspectionErrorInfo, "\n")
}
abnormalItems = append(abnormalItems, abnormalDetail)
}
}
}
// 格式化摘要信息
summary := fmt.Sprintf("巡检任务%d个,巡检项%d个,正常%d个,异常%d个", taskCount, itemCount, normalCount, abnormalCount)
// 构建企业微信通知内容
var content string
if notify.TemplateType == "markdown" {
// Markdown格式
content = fmt.Sprintf(`{
"msgtype": "markdown",
"markdown": {
"content": "# 自动化巡检结果通知\n\n> ### 执行作业:%s\n> ### 执行时间:%s\n> ### 执行结果:%s\n\n### **异常项列表:**\n%s"
}
}`,
jobExecution.ExecutionJobName,
jobExecution.EndTime.Format("2006-01-02 15:04:05"),
summary,
formatAbnormalItems(abnormalItems))
} else {
// 文本格式
content = fmt.Sprintf(`{
"msgtype": "text",
"text": {
"content": "巡检结果通知\n执行作业:%s\n执行时间:%s\n执行结果:%s\n\n异常项列表:\n%s"
}
}`,
jobExecution.ExecutionJobName,
jobExecution.EndTime.Format("2006-01-02 15:04:05"),
summary,
formatAbnormalItemsText(abnormalItems))
}
// 发送通知
ctx := context.Background()
sendParams := sender.SendParams{
NoticeType: notify.Type,
NoticeId: fmt.Sprintf("%d", notify.ID),
NoticeName: notify.Name,
Hook: notify.Address,
Content: content,
}
err = sender.Sender(&ctx, sendParams)
if err != nil {
global.GVA_LOG.Error("发送巡检通知失败", zap.Error(err))
return
}
global.GVA_LOG.Info("发送巡检通知成功",
zap.String("jobName", jobExecution.ExecutionJobName),
zap.String("summary", summary))
}
(4)PDF导出是用wkhtmltopdf
实现,该包依赖服务器上的wkhtmltopdf
命令。
func (jobExecutionService *JobExecutionService) GeneratePDF(jobExecution *AutoInspection.JobExecution) (string, error) {
pdf, err := wkhtmltopdf.NewPDFGenerator()
if err != nil {
global.GVA_LOG.Error("PDF生成器初始化失败", zap.Error(err))
return "", err
}
// 设置全局选项
pdf.Dpi.Set(300)
pdf.Orientation.Set(wkhtmltopdf.OrientationPortrait)
pdf.PageSize.Set(wkhtmltopdf.PageSizeA4)
pdf.MarginTop.Set(20)
pdf.MarginBottom.Set(20)
pdf.MarginLeft.Set(20)
pdf.MarginRight.Set(20)
// 渲染HTML模板
htmlContent, err := jobExecutionService.renderHTMLTemplate(jobExecution)
if err != nil {
global.GVA_LOG.Error("HTML模板渲染失败", zap.Error(err))
return "", err
}
// 创建一个页面并添加到生成器
page := wkhtmltopdf.NewPageReader(bytes.NewBufferString(htmlContent))
pdf.AddPage(page)
// 生成PDF
err = pdf.Create()
if err != nil {
return "", err
}
basePath := "uploads/pdf"
// 创建目录(如果不存在)
if err = os.MkdirAll(basePath, 0755); err != nil {
global.GVA_LOG.Error("创建PDF保存目录失败", zap.Error(err))
return "", err
}
filename := generatePDFFileName(jobExecution)
filePath := filepath.Join(basePath, filename)
// 3. 保存PDF到文件
if err = os.WriteFile(filePath, pdf.Bytes(), 0644); err != nil {
global.GVA_LOG.Error("保存PDF文件失败", zap.Error(err))
return "", err
}
....
return downloadURL, nil
}
以上就是实现巡检的主要代码。
最后
大部分企业虽然都有监控告警,但是自动化巡检在日常的运维工作中还是必要的,它可以聚合目前系统、集群存在的问题,避免遗漏告警信息。另外,在AI发展迅猛的今天,可以把AI也结合到自动化巡检中,比如在巡检中增加一些AI预测,AI故障诊断、AI根因分析等功能。