Skip to content

基于Golang开发一个接口拨测系统

之前写了一个《开发一个接口监控的 Prometheus Exporter》,当时只是单纯的实现了一个简单的 Exporter,但是基本能满足要求,最近对接口监控的需求做了升级,主要有:

  • 接口的管理通过前端页面实现,将数据存入数据库
  • 接口的校验除了可以校验状态码,还增加了返回值校验
  • 前端页面可以显示当前接口的可用性百分比
  • 拨测项可以灵活配置
    • 拨测频率可以灵活调整
    • 拨测结果校验可以灵活配置
    • 可以灵活开启或关闭拨测

功能的实现方式比较简单,梳理如下:

  • 用户创建拨测任务,将任务存入数据库
  • 后端为新的拨测起一个定时任务
  • 后端协程实时监听更新或者删除操作,更新定时任务
  • 拨测任务生成 Prometheus 指标,供 Prometheus 收集做监控告警使用

下面简单总结后端的实现和前端的效果。

Tips: 整个项目是使用gin-vue-admin搭建,拨测只是其中一个小功能。

后端实现

(1)定义数据库的结构体

go
// DialApi 拨测 结构体
type DialApi struct {
    global.GVA_MODEL
    Name           string `json:"name" form:"name" gorm:"column:name;default:'';comment:接口名称;size:32;"`                                       //接口名称
    Type           string `json:"type" form:"type" gorm:"column:type;default:'';comment:拨测类型 HTTP TCP PING DNS;size:8;"`                      // 拨测类型
    HttpMethod     string `json:"httpMethod" form:"httpMethod" gorm:"column:http_method;default:GET;comment:HTTP请求方法;size:8;"`                //HTTP请求方法
    Url            string `json:"url" form:"url" gorm:"column:url;comment:拨测地址;size:255;"binding:"required"`                                  //拨测地址
    RequestBody    string `json:"requestBody" form:"requestBody" gorm:"column:request_body;comment:请求BODY;size:255;"`                         //拨测地址
    Enabled        *bool  `json:"enabled" form:"enabled" gorm:"column:enabled;default:false;comment:是否启用;"binding:"required"`                 //是否启用
    Application    string `json:"application" form:"application" gorm:"column:application;comment:所属应用;size:32;"`                             //所属应用
    ExceptResponse string `json:"exceptResponse" form:"exceptResponse" gorm:"column:except_response;comment:预期返回值;size:32;"`                  //预期返回值
    HttpStatus     int    `json:"httpStatus" form:"httpStatus" gorm:"column:http_status;type:smallint(5);default:200;comment:预期状态码;size:16;"` //预期状态码
    Cron           string `json:"cron" form:"cron" gorm:"column:cron;comment:cron表达式;size:20;"`                                               //cron表达式
    SuccessRate    string `json:"successRate" form:"successRate" gorm:"column:success_rate;comment:拨测成功率"`
    CreatedBy      uint   `gorm:"column:created_by;comment:创建者"`
    UpdatedBy      uint   `gorm:"column:updated_by;comment:更新者"`
    DeletedBy      uint   `gorm:"column:deleted_by;comment:删除者"`
}

在结构体中,主要定义拨测相关字段,比如拨测地址,返回值,状态码,拨测频率等,这些字段都通过前端页面填写。

然后就是对拨测任务的增删改查,这类接口比较通用,可以直接复制gin-vue-admin中的实例进行修改。

(2)对于新创建的拨测任务,需要将其加入到定时任务中。在这里做了偷懒,直接使用gin-vue-admin的定时任务功能。因此,需要实现一个Run方法,如下:

go
type StartDialApi struct{}
type StartSingleDialApiTask struct{}

func (j *StartDialApi) Run() {
    var dialService = service.ServiceGroupApp.DialApiServiceGroup.DialApiService
    // 获取状态为打开的定时任务
    pageInfo := dialApiReq.DialApiSearch{}
    dialApiInfoList, _, err := dialService.GetDialApiInfoList(pageInfo)
    if err == nil {
       var option []cron.Option
       option = append(option, cron.WithSeconds())
       for _, dialApi := range dialApiInfoList {
          // 将cron的值变成表达式
          c := utils.ConvertToCronExpression(dialApi.Cron)
          dialApi.Cron = c
          dialService.AddSingleDialApiTimerTask(dialApi)
       }
    } else {
       global.GVA_LOG.Error("获取拨测任务列表失败")
    }
}

然后会调用dialService.AddSingleDialApiTimerTask实现定时任务的真正操作。

go
func (dialService *DialApiService) AddSingleDialApiTimerTask(dialApiEntity dialApi.DialApi) {
    var option []cron.Option
    option = append(option, cron.WithSeconds())
    idStr := strconv.Itoa(int(dialApiEntity.ID))
    cronName := global.DIAL_API + idStr
    taskName := global.DIAL_API + idStr
    task, found := global.GVA_Timer.FindTask(cronName, taskName)
    if !found {
       if *dialApiEntity.Enabled {
          _, err := global.GVA_Timer.AddTaskByFunc(cronName, dialApiEntity.Cron, func() {
             global.HealthCheckResults.WithLabelValues(dialApiEntity.Name, dialApiEntity.Type, "success").Add(0)
             global.HealthCheckResults.WithLabelValues(dialApiEntity.Name, dialApiEntity.Type, "failed").Add(0)
             switch dialApiEntity.Type {
             case "HTTP":
                ok := checkHTTP(dialApiEntity)
                if ok {
                   global.HealthCheckResults.WithLabelValues(dialApiEntity.Name, dialApiEntity.Type, "success").Add(1)
                } else {
                   global.HealthCheckResults.WithLabelValues(dialApiEntity.Name, dialApiEntity.Type, "failed").Add(1)
                }

                // 记录日志
                logHealthCheckResult(ok, nil, dialApiEntity, "HTTP")

                // 获取Prometheus指标并存入数据库
                getSuccessRateFromPrometheus(dialApiEntity)

             case "TCP", "DNS", "ICMP":
                var ok bool
                var err error
                switch dialApiEntity.Type {
                case "TCP":
                   ok, err = checkTCP(dialApiEntity)
                case "DNS":
                   ok, err = checkDNS(dialApiEntity)
                case "ICMP":
                   ok, err = checkICMP(dialApiEntity)
                }
                if ok {
                   global.HealthCheckResults.WithLabelValues(dialApiEntity.Name, dialApiEntity.Type, "success").Add(1)
                } else {
                   global.HealthCheckResults.WithLabelValues(dialApiEntity.Name, dialApiEntity.Type, "failed").Add(1)
                }

                // 记录日志
                logHealthCheckResult(ok, err, dialApiEntity, dialApiEntity.Type)

                // 获取Prometheus指标并存入数据库
                getSuccessRateFromPrometheus(dialApiEntity)
             default:
                global.GVA_LOG.Error("未知的检测类型",
                   zap.String("DetectType", dialApiEntity.Type),
                )
             }
          }, global.DIAL_API+idStr, option...)
          if err != nil {
             global.GVA_LOG.Error(fmt.Sprintf("添加拨测定时任务失败: %s : %s , 原因是: %s", idStr, dialApiEntity.Name, err.Error()))
          }
       }
    } else {
       if task.Spec != dialApiEntity.Cron {
          global.GVA_LOG.Info(fmt.Sprintf("修改定时任务时间: %s", dialApiEntity.Name))
          global.GVA_Timer.Clear(global.DIAL_API + idStr)
          dialService.AddSingleDialApiTimerTask(dialApiEntity)
       } else if !*dialApiEntity.Enabled || dialApiEntity.DeletedAt.Valid {
          global.GVA_LOG.Info(fmt.Sprintf("停止拨测任务: %s", dialApiEntity.Name))
          global.GVA_Timer.RemoveTaskByName(cronName, taskName)
       }
    }
}

在该方法中,先判断定时任务是否已经存在,只有不存在且开启拨测的任务才会加入定时任务。否则,就会执行修改或者删除逻辑。

另外,为了方便前端显示拨测成功率,每次执行任务的时候会计算一次成功率,这里采用的是直接计算 Prometheus 指标,使用getSuccessRateFromPrometheus方法实现,如下:

go
func getSuccessRateFromPrometheus(dialApiEntity dialApi.DialApi) {
    // 查询prometheus获取过去1小时的成功率
    successQuery := fmt.Sprintf(`sum(rate(health_check_results{name="%s", type="%s", status="success"}[1h]))`, dialApiEntity.Name, dialApiEntity.Type)
    totalQuery := fmt.Sprintf(`sum(rate(health_check_results{name="%s", type="%s"}[1h]))`, dialApiEntity.Name, dialApiEntity.Type)
    successResponse, err := utils.QueryPrometheus(successQuery, global.GVA_CONFIG.Prometheus.Address)
    if err != nil {
       global.GVA_LOG.Error("Failed to query success rate from Prometheus", zap.Error(err))
       return
    }
    totalResponse, err := utils.QueryPrometheus(totalQuery, global.GVA_CONFIG.Prometheus.Address)
    if err != nil {
       global.GVA_LOG.Error("Failed to query total rate from Prometheus", zap.Error(err))
       return
    }

    // 解析 Prometheus 响应并计算成功率
    var successValue float64
    var totalValue float64
    if len(successResponse.Data.Result) > 0 {
       for _, result := range successResponse.Data.Result {
          if value, ok := result.Value[1].(string); ok {
             if value, err := strconv.ParseFloat(value, 64); err == nil {
                successValue = value
             }
          }
       }
    }
    if len(totalResponse.Data.Result) > 0 {
       for _, result := range totalResponse.Data.Result {
          if value, ok := result.Value[1].(string); ok {
             if value, err := strconv.ParseFloat(value, 64); err == nil {
                totalValue = value
             }
          }
       }
    }

    if totalValue > 0 {
       successRate := CalculateSuccessRate(successValue, totalValue)

       // 获取数据库中最新的值
       var dialService = DialApiService{}
       dial, err := dialService.GetDialApi(strconv.Itoa(int(dialApiEntity.ID)))
       if err != nil {
          global.GVA_LOG.Error("获取任务失败", zap.String("err", err.Error()))
       }
       successRateStr := fmt.Sprintf("%.2f", successRate)
       if dial.SuccessRate != successRateStr {
          dial.SuccessRate = successRateStr
          err := dialService.UpdateDialApi(dial)
          if err != nil {
             global.GVA_LOG.Error("更新任务成功率失败", zap.String("err", err.Error()))
             return
          }
       }

    }
}

// CalculateSuccessRate 计算成功率
func CalculateSuccessRate(success, total float64) float64 {
    if total == 0 {
       return 0
    }
    return (success / total) * 100 // 返回百分比形式的成功率
}

另外,拨测任务支持HTTPTCPDNS以及ICMP(ICMP 功能未完善),代码如下:

go
func checkHTTP(dialApiEntity dialApi.DialApi) bool {
    idStr := strconv.Itoa(int(dialApiEntity.ID))
    var response *http.Response = nil
    var httpErr error = nil
    switch dialApiEntity.HttpMethod {
    case "GET":
       response, httpErr = http.Get(dialApiEntity.Url)
       break
    case "POST":
       response, httpErr = http.Post(dialApiEntity.Url, "application/json", strings.NewReader(dialApiEntity.RequestBody))
       break
    default:
    }
    if response != nil {
       dialApiRecrod := new(dialApi.DialApiRecrod)
       dialApiRecrod.DialApiId = dialApiEntity.ID
       dialApiRecrod.CreatedAt = time.Now()
       dialApiRecrod.UpdatedAt = time.Now()

       if httpErr == nil {
          if response.StatusCode == dialApiEntity.HttpStatus {
             // 如果定义了返回值判断
             if dialApiEntity.ExceptResponse != "" {
                bodyBytes, err := io.ReadAll(response.Body)
                if err != nil {
                   return false
                }
                if strings.Contains(string(bodyBytes), dialApiEntity.ExceptResponse) {
                   return true
                } else {
                   return false
                }
             } else {
                return true
             }
          } else {
             global.GVA_LOG.Info(idStr + ":" + dialApiEntity.Name + "拨测结果与预期不一致")
             return false
          }
       } else {
          global.GVA_LOG.Error("拨测失败: " + dialApiEntity.Url)
          dialApiRecrod.FailReason = httpErr.Error()
          return false
       }
    }
    return false
}

func checkTCP(dialApiEntity dialApi.DialApi) (bool, error) {
    conn, err := net.DialTimeout("tcp", dialApiEntity.Url, 5*time.Second)
    if err != nil {
       return false, err
    }
    defer conn.Close()
    return true, nil
}

func checkDNS(dialApiEntity dialApi.DialApi) (bool, error) {
    _, err := net.LookupHost(dialApiEntity.Url)
    if err != nil {
       return false, err
    }
    return true, nil
}

func checkICMP(dialApiEntity dialApi.DialApi) (bool, error) {
    pinger, err := ping.NewPinger(dialApiEntity.Url)
    if err != nil {
       return false, err
    }
    pinger.Count = 2
    err = pinger.Run() // Blocks until finished.
    if err != nil {
       return false, err
    }
    return true, nil
}

其中HTTP拨测是比较常用的,相比之前的 Prometheus Exporter,这里丰富了对结果的校验,使拨测的结果值更准确。

(3)如果遇到拨测任务的更新或者删除,有一个定时的协程去处理。如下:

go
func startUpdateDialCron() {
    var dialService = service.ServiceGroupApp.DialApiServiceGroup.DialApiService
    for {
       select {
       case updateId := <-global.UpdateDialAPIChannel:
          // 获取数据
          if updateId != "" {
             dial, err := dialService.GetDialApi(updateId)
             if err != nil {
                global.GVA_LOG.Error("获取任务失败", zap.String("err", err.Error()))
                continue
             } else {
                // 先删除旧的定时任务
                global.GVA_LOG.Info("更新定时任务", zap.String("updateId", updateId))
                cronName := global.DIAL_API + updateId
                taskName := global.DIAL_API + updateId
                if _, found := global.GVA_Timer.FindTask(cronName, taskName); found {
                   global.GVA_Timer.Clear(cronName)
                   // 启动新的定时任务
                   // 将cron的值变成表达式
                   c := utils.ConvertToCronExpression(dial.Cron)
                   dial.Cron = c
                   dialService.AddSingleDialApiTimerTask(dial)
                }
             }
          }
       case deleteId := <-global.DeleteDialAPIChannel:
          if deleteId != "" {
             cronName := global.DIAL_API + deleteId
             taskName := global.DIAL_API + deleteId
             if _, found := global.GVA_Timer.FindTask(cronName, taskName); found {
                global.GVA_LOG.Info("删除定时任务", zap.String("updateId", deleteId))
                global.GVA_Timer.RemoveTaskByName(cronName, taskName)
             }
          }
       }
    }
}

该协程监听global.UpdateDialAPIChannelglobal.DeleteDialAPIChannel这两个 channel,然后再调用dialService.AddSingleDialApiTimerTask对定时任务进行操作。

上面就是简单的接口拨测的功能实现,因能力有限,所以代码比较混乱。

前端展示

为了便于日常的维护,所以开发一个前端界面,主要支持拨测任务的增删改查。

新增拨测任务,可以灵活选择拨测类型以及定义返回值和状态码。 bfe310243362e5fc9da076e15e632137 MD5

然后可以查看拨测任务的具体情况,也可以灵活开启或者关闭或者任务。 [[附件/images/b4f8947b206a55c077b3bde78e937b16_MD5.jpeg|Open: Pasted image 20241111101737.png]]

监控告警

在前端页面只是展示了成功率,实际告警还是通过 Prometheus 实现,该平台暂未实现直接配置告警。

所以,只需要创建一个 Prometheus 收集的 Job,就可以查看对应的指标,指标名是health_check_results,如下:

[[附件/images/698fb2dab0ec402d8b7faeca1a165468_MD5.jpeg|Open: Pasted image 20241111101816.png]] 698fb2dab0ec402d8b7faeca1a165468 MD5

然后再配置一个告警规则,在使用率低于 100%的时候发送告警通知,如下:

[[附件/images/c3d19966bc0830f29aee09bccb061d79_MD5.jpeg|Open: Pasted image 20241111101826.png]] c3d19966bc0830f29aee09bccb061d79 MD5

至此,整个功能就实现了,足够满足日常使用。在公有云上,是有成熟的拨测产品,不过有的收费比较贵,好处是可以实现不同地区的拨测,覆盖面比较广。另外,也可以使用 Black Exporter 实现拨测,其也支持上面的所有功能,只是没有前端的维护界面,不过功能强大很多。

最近更新