✍ 道路千万条,安全第一条。操作不规范,运维两行泪。
一、引言
在当今复杂的 IT 系统架构中,监控体系对于保障系统的稳定运行至关重要。而 Alertmanager 作为监控体系里关键的一环,在处理告警信息、确保相关人员及时响应等方面发挥着无可替代的作用。它就像是一个信息枢纽,接收来自各个监控源的告警信息,经过一系列智能处理后,精准地将关键信息传递给相关人员。
接下来,让我们深入探索 Alertmanager 的实现逻辑,通过源码分析,一窥其背后的运行机制。这不仅能帮助我们更好地理解和运用 Alertmanager,还能为优化监控体系提供有力的支持。
二、Alertmanager 初相识
(一)功能概览
Alertmanager 具备一系列强大的功能,旨在高效处理和管理告警信息。
告警去重:在复杂的监控环境中,同一问题可能会产生多个重复的告警,这不仅会干扰运维人员的判断,还可能导致重要信息被淹没。Alertmanager 通过独特的算法,比较告警的标签、内容等关键信息,精准识别并去除重复的告警。例如,当某个服务器的 CPU 使用率持续过高,多个监控指标可能会同时触发告警,但 Alertmanager 能够将这些重复告警合并为一个,确保运维人员只收到一次通知 ,有效减少了告警噪音,让运维人员能够专注于真正的问题。
告警分组:将相似的告警进行分组,是 Alertmanager 的又一核心功能。通过合理的分组策略,能够将大量分散的告警信息整理成有序的集合,提高告警的可读性和管理效率。比如,在一个大型电商系统中,可能会有多个与订单处理相关的服务出现故障,如订单创建失败、订单支付异常等告警。Alertmanager 可以根据预先设定的规则,将这些与订单处理相关的告警归为一组,以单一通知的形式发送给负责订单业务的运维团队。这样,运维人员可以一次性了解到订单业务相关的所有问题,快速定位故障范围,提高故障处理的效率。
告警路由:Alertmanager 支持根据告警的标签、内容等属性,将告警精准地路由到不同的接收器,如电子邮件、Slack、PagerDuty 等。这一功能使得告警能够及时送达至最合适的人员或团队手中,确保问题得到及时处理。例如,对于与网络相关的告警,可以配置 Alertmanager 将其发送给网络运维团队的 Slack 群组;而对于与数据库相关的告警,则发送到数据库管理员的邮箱。通过灵活的路由配置,实现了告警通知的个性化和精准化,大大提高了告警响应的及时性和准确性。
告警抑制:在某些情况下,一个告警的产生可能会引发一系列其他相关告警。为了避免在这种情况下运维人员收到过多冗余的告警通知,Alertmanager 提供了告警抑制功能。通过设置抑制规则,当某个特定告警被触发后,其他与之相关的告警可以被临时抑制。例如,当整个数据中心的网络出现故障时,可能会导致大量服务器和服务的连接异常告警。此时,可以配置 Alertmanager,当数据中心网络故障的告警被触发后,抑制所有服务器和服务的连接异常告警,只保留网络故障的告警通知,这样可以有效避免告警风暴,让运维人员能够快速定位到问题的根源 。
(二)工作流程总览
Alertmanager 的工作流程从接收告警开始,历经多个关键环节,最终将处理后的告警信息发送给相应的接收者。
首先,Alertmanager 通过其 HTTP API 接收来自 Prometheus 或其他监控系统发送的告警信息。这些告警信息包含了丰富的元数据,如告警名称、描述、标签、发生时间等。例如,Prometheus 在监测到某个服务器的 CPU 使用率超过 80% 且持续 5 分钟后,会向 Alertmanager 发送一条告警信息,其中包括告警名称“High CPU Usage”、详细描述“Server X's CPU usage has exceeded 80% for 5 minutes”,以及相关标签如“server=X”“service=backend”等。
接收告警后,Alertmanager 会进行去重处理。它会根据告警的标签和内容,判断是否存在重复的告警事件。如果发现重复告警,会将其合并,确保在一定时间内,同一告警只会被通知一次。
接着,进入告警分组环节。根据预先设定的分组规则,Alertmanager 会将具有相同或相似标签的告警归为一组。例如,根据“service”标签进行分组,所有与“backend”服务相关的告警会被合并成一个组。分组后的告警信息会以更清晰、有条理的方式呈现给运维人员,便于他们进行统一处理。
之后,告警路由开始发挥作用。Alertmanager 会根据告警的属性和配置的路由规则,将告警分发到相应的接收器。例如,对于“severity=critical”且“service=backend”的告警,会被路由到后端运维团队的 Slack 群组;而对于“severity=warning”且“service=frontend”的告警,则会发送到前端运维团队的电子邮件地址。
在整个过程中,Alertmanager 还会根据配置的抑制规则进行抑制判断。如果满足抑制条件,某些相关告警的通知将被临时抑制,避免过多冗余告警的干扰。
最后,通过配置好的接收器,如电子邮件、Slack 等,将处理后的告警信息发送给相应的人员或团队。这样,运维人员能够及时获取到关键的告警信息,采取相应的措施进行处理,保障系统的稳定运行。
三、核心功能的实现逻辑
(一)告警去重机制
1. 哈希算法原理
Alertmanager 采用哈希算法实现告警去重。它通过对告警的标签进行特定计算,生成一个哈希值。以代码中的 hashAlert
函数为例,具体步骤如下:
func hashAlert(a *types.Alert) uint64 {
const sep = '\xff'
hb := hashBuffers.Get().(*hashBuffer)
defer hashBuffers.Put(hb)
b := hb.buf[:0]
names := make(model.LabelNames, 0, len(a.Labels))
for ln := range a.Labels {
names = append(names, ln)
}
sort.Sort(names)
for _, ln := range names {
b = append(b, string(ln)...)
b = append(b, sep)
b = append(b, string(a.Labels[ln])...)
b = append(b, sep)
}
hash := xxhash.Sum64(b)
return hash
}
首先,它将告警 a
的所有标签名提取出来,存入 names
切片中。接着,对 names
进行排序,这一步至关重要,确保了相同标签集合无论以何种顺序输入,都能得到一致的处理结果。然后,遍历排序后的 names
,将标签名、分隔符 sep
、标签值以及分隔符依次追加到字节切片 b
中。最后,使用 xxhash.Sum64
函数对字节切片 b
进行哈希计算,得到最终的哈希值。这个哈希值就像告警的“指纹”,用于唯一标识该告警。如果两个告警的哈希值相同,那么在去重机制中,它们就被视为重复告警。
2. 去重流程
在 Alertmanager 的去重过程中,DedupStage
阶段起着关键作用。其核心逻辑如下:
func (n *DedupStage) Exec(ctx context.Context, _ *slog.Logger, alerts ...*types.Alert) (context.Context, []*types.Alert, error) {
gkey, ok := GroupKey(ctx)
if !ok {
return ctx, nil, errors.New("group key missing")
}
repeatInterval, ok := RepeatInterval(ctx)
if !ok {
return ctx, nil, errors.New("repeat interval missing")
}
firingSet := map[uint64]struct{}{}
resolvedSet := map[uint64]struct{}{}
firing := []uint64{}
resolved := []uint64{}
var hash uint64
for _, a := range alerts {
hash = n.hash(a)
if a.Resolved() {
resolved = append(resolved, hash)
resolvedSet[hash] = struct{}{}
} else {
firing = append(firing, hash)
firingSet[hash] = struct{}{}
}
}
ctx = WithFiringAlerts(ctx, firing)
ctx = WithResolvedAlerts(ctx, resolved)
entries, err := n.nflog.Query(nflog.QGroupKey(gkey), nflog.QReceiver(n.recv))
if err != nil && !errors.Is(err, nflog.ErrNotFound) {
return ctx, nil, err
}
var entry *nflogpb.Entry
switch len(entries) {
case 0:
case 1:
entry = entries[0]
default:
return ctx, nil, fmt.Errorf("unexpected entry result size %d", len(entries))
}
if n.needsUpdate(entry, firingSet, resolvedSet, repeatInterval) {
return ctx, alerts, nil
}
return ctx, nil, nil
}
在这个函数中,首先初始化了两个用于存储哈希值的集合 firingSet
和 resolvedSet
,分别用于记录触发状态和已解决状态的告警哈希值,同时初始化了两个用于存储哈希值的切片 firing
和 resolved
。然后,遍历传入的所有告警 alerts
,通过 n.hash(a)
计算每个告警的哈希值。根据告警的状态(a.Resolved()
判断是否已解决),将哈希值分别存入对应的集合和切片中。接着,通过 WithFiringAlerts
和 WithResolvedAlerts
函数将计算得到的触发和已解决的告警哈希值切片存入上下文 ctx
中,以便后续阶段使用。之后,从日志 nflog
中查询与当前告警组和接收器相关的记录 entries
。根据查询结果,判断是否需要更新告警信息。如果 n.needsUpdate
返回 true
,则说明当前实例需要继续发送这些告警,函数返回当前上下文 ctx
、原始告警列表 alerts
以及 nil
错误,表示处理成功;否则,返回当前上下文 ctx
、空的告警列表 nil
以及 nil
错误,意味着这些告警已被其他实例发送,本实例不再重复发送。通过这一系列步骤,Alertmanager 有效地实现了告警去重,避免了重复告警对运维人员的干扰。
(二)告警分组策略
1. 分组依据
Alertmanager 主要依据告警的标签来进行分组。通过配置 group_by
参数,可以指定按照哪些标签进行分组。例如,当配置 group_by: ['alertname', 'cluster']
时,具有相同 alertname
和 cluster
标签值的告警会被归为一组。假设在一个分布式系统中,有多个服务实例运行在不同的集群上。当某个服务出现故障时,会产生多个告警,每个告警都带有 alertname
(如 ServiceDown
)和 cluster
(如 cluster1
、cluster2
等)标签。根据上述配置,所有 alertname
为 ServiceDown
且 cluster
相同的告警会被分到一组。这样,运维人员可以清晰地看到每个集群中该服务的故障情况,而不是面对大量零散的告警信息,大大提高了故障排查的效率。如果在一个电商系统中,订单服务出现问题,可能会触发多个与订单相关的告警,如订单创建失败、订单支付失败等。通过合理设置 group_by
参数,将这些告警按照订单服务相关的标签进行分组,运维人员可以快速了解订单服务整体的故障状况,而不是被众多单独的告警所困扰。
2. 分组时间控制
分组时间控制涉及到 group_wait
和 group_interval
两个重要参数。
group_wait
表示在发送一个告警组的通知之前,Alertmanager 等待新告警加入同一组的时间。例如,当设置 group_wait: 30s
时,如果在这 30 秒内,有新的符合分组条件的告警产生,它们会被添加到当前组中,然后一起发送通知。这就好比在收集货物准备发货,在 30 秒的等待时间内,不断有新的货物(新告警)到达,都被装进同一个包裹(告警组),30 秒后,这个包裹被寄出(发送告警组通知)。这样可以避免短时间内频繁发送小的告警组通知,减少通知的数量,提高运维人员的处理效率。
group_interval
则定义了在一个告警组已经发送通知后,再次发送该组更新通知之前需要等待的时间。例如,设置 group_interval: 5m
,当一个告警组在某一时刻发送了通知后,在接下来的 5 分钟内,即使该组有新的告警加入或状态发生变化,Alertmanager 也不会立即发送更新通知。只有在 5 分钟之后,才会重新评估该组是否需要发送新的通知。这有助于防止在短时间内对同一问题进行过度通知,避免运维人员被频繁的告警更新所打扰。比如,某个服务的短暂波动可能会在短时间内产生多个告警,但通过合理设置 group_interval
,可以将这些告警的更新合并在一个适当的时间点发送,让运维人员能够更有效地处理告警信息。
(三)告警路由规则
1. 路由树结构
Alertmanager 的路由规则以路由树的形式组织。路由树的每个节点都包含了一系列的配置信息,用于决定如何处理接收到的告警。节点的核心构成包括匹配条件和 Continue
字段。
匹配条件用于判断告警是否与该节点匹配。例如,通过设置 match: {severity: 'critical'}
,表示当告警的 severity
标签值为 critical
时,该告警与这个节点匹配。还可以使用正则表达式进行更灵活的匹配,如 match_re: {service: '^backend.*'}
,这会匹配所有 service
标签以 backend
开头的告警。
Continue
字段则决定了告警在匹配当前节点后的行为。当 Continue
为 false
(默认值)时,一旦告警匹配到当前节点,就会停止在路由树中的进一步匹配,直接按照该节点的配置进行处理,如将告警发送到指定的接收器。例如,在一个简单的路由配置中,顶级节点配置了 receiver: 'default_receiver'
,一个子节点配置为 match: {severity: 'critical'}, receiver: 'critical_receiver', Continue: false
。当一个 severity
为 critical
的告警到达时,它会匹配到这个子节点,由于 Continue
为 false
,告警将直接被发送到 critical_receiver
,不再继续检查其他子节点。而当 Continue
为 true
时,告警在匹配当前节点后,会继续在路由树中向下匹配其他节点,直到找到最合适的处理方式。假设存在另一个子节点配置为 match: {service: 'database'}, receiver: 'database_receiver', Continue: true
,且告警的 service
标签为 database
,即使它先匹配到了前面的 critical
节点,由于该节点 Continue
为 true
,告警仍会继续匹配到这个 database
节点,并最终可能被发送到 database_receiver
,具体取决于后续的匹配情况。通过这种灵活的路由树结构和配置,Alertmanager 能够根据告警的各种属性,将其准确地路由到相应的接收器,实现高效的告警分发。
2. 路由匹配过程
当 Alertmanager 接收到一个告警时,会从路由树的顶级节点开始进行匹配。以如下代码所示的路由配置为例:
route:
group_by: ['alertname']
group_wait: 30s
group_interval: 5m
repeat_interval: 4h
receiver: 'default_receiver'
routes:
- match:
severity: 'critical'
service: 'backend'
receiver: 'backend_critical_receiver'
continue: false
- match:
severity: 'warning'
service: 'backend'
receiver: 'backend_warning_receiver'
continue: true
- match:
service: 'frontend'
receiver: 'frontend_receiver'
当一个告警到达后,首先会检查其是否满足顶级节点的条件(这里顶级节点主要是设置了一些默认参数和分组相关配置)。然后,依次检查子节点的匹配条件。假设一个告警的标签为 severity: 'critical', service: 'backend'
,它会首先匹配到第一个子节点,由于该子节点的 Continue
为 false
,告警将直接被发送到 backend_critical_receiver
,不再继续检查其他子节点。若一个告警的标签为 severity: 'warning', service: 'backend'
,它会匹配到第二个子节点,因为 Continue
为 true
,告警会继续检查后续子节点。如果后续没有其他更匹配的节点,它将根据该节点的配置被发送到 backend_warning_receiver
。再假设一个告警的标签为 service: 'frontend'
,它不会匹配到前两个子节点,但会匹配到第三个子节点,最终被发送到 frontend_receiver
。通过这样逐步的匹配过程,Alertmanager 能够根据告警的具体属性,将其精准地路由到最合适的接收器,确保告警能够被及时、准确地处理。
(四)告警抑制功能
1. 抑制规则定义
在 Alertmanager 的配置文件中,通过 inhibit_rules
部分定义抑制规则。每个抑制规则包含以下三个主要部分:
-
source_match
和source_match_re
:定义触发抑制的告警的匹配条件。这些告警通常是高等级的告警。source_match
用于精确匹配标签值,而source_match_re
用于正则表达式匹配。 -
target_match
和target_match_re
:定义将被抑制的告警的匹配条件。这些告警通常是低等级的告警。同样,target_match
用于精确匹配,target_match_re
用于正则表达式匹配。 -
equal
:定义触发抑制的告警和被抑制的告警之间必须匹配的标签。只有当这些标签的值相同时,抑制规则才会生效。
2、抑制逻辑的处理流程
-
告警接收与存储:Alertmanager 通过 API 接收来自 Prometheus 的告警信息,并将其存储在内存中的
Alert Provider
中。 -
告警匹配与分组:Dispatcher 组件从
Alert Provider
订阅告警信息,并根据配置的路由规则(route
)对告警进行匹配和分组。 -
抑制规则的匹配:在告警分组后,进入 Notification Pipeline 组件的
InhibitStage
阶段。此阶段会检查当前告警是否满足抑制规则:-
首先,检查当前告警是否匹配
target_match
和target_match_re
定义的条件。 -
然后,检查是否存在已触发的告警(即
source
告警),且该source
告警满足source_match
和source_match_re
定义的条件。 -
最后,检查
source
告警和当前告警的equal
标签值是否相同。如果所有条件都满足,则当前告警被标记为抑制状态,不会发送通知。
-
-
后续处理:被抑制的告警不会进入后续的通知发送阶段,从而避免了不必要的告警通知。
2. 告警抑制的实现源码
- 抑制规则的定义与加载 在 Alertmanager 配置文件加载时,会解析
inhibit_rules
配置,并将其转换为内部的抑制规则结构。相关代码如下:
// inhibit/inhibitor.go
func NewInhibitor(ap provider.Alerts, rs []config.InhibitRule, mk types.AlertMarker, logger *slog.Logger) *Inhibitor {
ih := &Inhibitor{
alerts: ap,
marker: mk,
logger: logger,
}
for _, cr := range rs {
r := NewInhibitRule(cr)
ih.rules = append(ih.rules, r)
}
return ih
}
这里,config.InhibitRule
是从配置文件中解析出的抑制规则结构,Inhibitor
结构体用于管理这些规则。
- 抑制规则的匹配 在
InhibitStage
阶段,会调用Inhibitor
的Mutes
方法来检查告警是否满足抑制规则。相关代码如下:
// inhibit/inhibit.go
func (ih *Inhibitor) Mutes(lset model.LabelSet) bool {
fp := lset.Fingerprint()
for _, r := range ih.rules {
if !r.TargetMatchers.Matches(lset) {
// If target side of rule doesn't match, we don't need to look any further.
continue
}
// If we are here, the target side matches. If the source side matches, too, we
// need to exclude inhibiting alerts for which the same is true.
if inhibitedByFP, eq := r.hasEqual(lset, r.SourceMatchers.Matches(lset)); eq {
ih.marker.SetInhibited(fp, inhibitedByFP.String())
return true
}
}
ih.marker.SetInhibited(fp)
return false
}
Mutes
方法会遍历所有定义的抑制规则,调用每个规则的 Mutes
方法来判断当前告警是否满足该规则。
告警抑制的方法在 notify/notify.go
中的抑制阶段 MuteStage
结构体中进行调用,如下:
// MuteStage filters alerts through a Muter.type MuteStage struct {
muter types.Muter
metrics *Metrics
}
// NewMuteStage return a new MuteStage.
func NewMuteStage(m types.Muter, metrics *Metrics) *MuteStage {
return &MuteStage{muter: m, metrics: metrics}
}
// Exec implements the Stage interface.func (n *MuteStage) Exec(ctx context.Context, logger *slog.Logger, alerts ...*types.Alert) (context.Context, []*types.Alert, error) {
var (
filtered []*types.Alert
muted []*types.Alert
)
for _, a := range alerts {
// TODO(fabxc): increment total alerts counter.
// Do not send the alert if muted.
if n.muter.Mutes(a.Labels) {
muted = append(muted, a)
} else {
filtered = append(filtered, a)
}
// TODO(fabxc): increment muted alerts counter if muted.
}
if len(muted) > 0 {
var reason string
switch n.muter.(type) {
case *silence.Silencer:
reason = SuppressedReasonSilence
case *inhibit.Inhibitor:
reason = SuppressedReasonInhibition
default:
}
n.metrics.numNotificationSuppressedTotal.WithLabelValues(reason).Add(float64(len(muted)))
logger.Debug("Notifications will not be sent for muted alerts", "alerts", fmt.Sprintf("%v", muted), "reason", reason)
}
return ctx, filtered, nil
}
通过 n.muter.Mutes(a.Labels)
来判断告警是否被抑制,如果未被抑制,则存入 filtered
结构体进行后续处理,反之存入 muted
结构体,这些告警不发送通知。
通过以上源码分析可以看出,Alertmanager 的告警抑制功能是通过配置文件定义规则,然后在告警处理流程中逐条检查告警是否满足抑制规则来实现的
四、源码深度剖析
(一)关键数据结构
1. Alert 结构
在 Alertmanager 的源码中,Alert
结构体是表示告警信息的核心数据结构。其定义如下:
type Alert struct {
GeneratorURL strfmt.URI `json:"generatorURL,omitempty"`
Labels LabelSet `json:"labels"`
}
Labels
字段是一个 LabelSet
类型的键值对集合,用于唯一标识告警,并在告警分组、路由和去重等操作中发挥关键作用。例如,在告警分组时,通过比较不同告警的 Labels
中指定的标签,将具有相同标签值的告警归为一组。假设在一个分布式系统中,有多个服务实例,每个服务实例的告警都带有 service
、instance
等标签。当配置 group_by: ['service']
时,Labels
中 service
标签值相同的告警会被分到同一组。
2. 其他重要结构
除了 Alert
结构,还有一些与告警处理紧密相关的重要结构。
Group
结构在告警分组中起着关键作用。它将多个相关的 Alert
组合在一起,方便进行统一处理和通知。一个 Group
通常包含一组具有相同或相似特征的告警,这些特征由配置的 group_by
标签决定。例如,在一个包含多个微服务的系统中,根据 service
标签进行分组,所有与 user-service
相关的告警会被归到同一个 Group
中。在代码实现中,Group
结构可能包含 Labels
(用于标识该组告警的公共标签)、Alerts
(该组内的告警列表)等字段,如下所示:
type AlertGroup struct {
Alerts []*GettableAlert `json:"alerts"`
Labels LabelSet `json:"labels"`
Receiver *Receiver `json:"receiver"`
}
在告警分组过程中,通过计算告警的标签指纹,将具有相同指纹(即相同 group_by
标签组合)的告警添加到同一个 Group
中。这使得运维人员在收到通知时,可以一次性了解到与某个特定服务或场景相关的所有告警情况,而不是面对大量零散的告警信息,大大提高了告警处理的效率。
Route
结构则用于定义告警的路由规则。它以树状结构组织,每个节点都包含了一系列的配置信息,用于决定如何处理接收到的告警。其核心构成包括 Matchers
(匹配条件)、Receiver
(接收器)和 Continue
字段。
type Route struct {
parent *Route
// The configuration parameters for matches of this route.
RouteOpts RouteOpts
// Matchers an alert has to fulfill to match // this route.
Matchers labels.Matchers
// If true, an alert matches further routes on the same level.
Continue bool
// Children routes of this route.
Routes []*Route
}
Matchers
用于判断告警是否与该节点匹配。例如,通过设置 Matchers: labels.Matchers{{Name: "severity", Value: "critical"}}
,表示当告警的 severity
标签值为 critical
时,该告警与这个节点匹配。Receiver
指定了匹配该节点的告警要发送到的接收器,如电子邮件地址、Slack 频道等。Continue
字段决定了告警在匹配当前节点后的行为。当 Continue
为 false
(默认值)时,一旦告警匹配到当前节点,就会停止在路由树中的进一步匹配,直接按照该节点的配置进行处理,如将告警发送到指定的接收器。而当 Continue
为 true
时,告警在匹配当前节点后,会继续在路由树中向下匹配其他节点,以寻找更合适的处理方式。通过这种灵活的路由树结构和配置,Alertmanager 能够根据告警的各种属性,将其准确地路由到相应的接收器,实现高效的告警分发。
(二)核心处理流程
1. API 接收告警
在 Alertmanager 中,API 接收告警的功能主要由 api/v2/api.go
文件中的代码实现。当 Alertmanager 接收到来自 Prometheus 或其他监控系统发送的告警时,会调用 postAlertsHandler
函数,其代码如下:
func (api *API) postAlertsHandler(params alert_ops.PostAlertsParams) middleware.Responder {
logger := api.requestLogger(params.HTTPRequest)
alerts := OpenAPIAlertsToAlerts(params.Alerts)
now := time.Now()
api.mtx.RLock()
resolveTimeout := time.Duration(api.alertmanagerConfig.Global.ResolveTimeout)
api.mtx.RUnlock()
for _, alert := range alerts {
alert.UpdatedAt = now
// Ensure StartsAt is set.
if alert.StartsAt.IsZero() {
if alert.EndsAt.IsZero() {
alert.StartsAt = now
} else {
alert.StartsAt = alert.EndsAt
}
}
// If no end time is defined, set a timeout after which an alert
// is marked resolved if it is not updated. if alert.EndsAt.IsZero() {
alert.Timeout = true
alert.EndsAt = now.Add(resolveTimeout)
}
if alert.EndsAt.After(time.Now()) {
api.m.Firing().Inc()
} else {
api.m.Resolved().Inc()
}
}
// Make a best effort to insert all alerts that are valid.
var (
validAlerts = make([]*types.Alert, 0, len(alerts))
validationErrs = &types.MultiError{}
)
for _, a := range alerts {
removeEmptyLabels(a.Labels)
if err := a.Validate(); err != nil {
validationErrs.Add(err)
api.m.Invalid().Inc()
continue
}
validAlerts = append(validAlerts, a)
}
if err := api.alerts.Put(validAlerts...); err != nil {
logger.Error("Failed to create alerts", "err", err)
return alert_ops.NewPostAlertsInternalServerError().WithPayload(err.Error())
}
if validationErrs.Len() > 0 {
logger.Error("Failed to validate alerts", "err", validationErrs.Error())
return alert_ops.NewPostAlertsBadRequest().WithPayload(validationErrs.Error())
}
return alert_ops.NewPostAlertsOK()
}
首先,函数将接收到的告警数据 params.Alerts
通过 OpenAPIAlertsToAlerts
函数进行转换,得到 alerts
列表。接着,对每个告警 a
进行处理。调用 removeEmptyLabels
函数清理告警标签中的空值,确保标签数据的有效性。然后,通过 a.Validate()
方法对告警进行全面校验,包括检查标签的格式是否正确、是否包含必要的标签等。如果校验失败,将错误信息添加到 validationErrs
中,并增加无效告警的统计计数 api.m.Invalid().Inc()
,同时跳过该无效告警,继续处理下一个。
经过校验后,将所有有效的告警 validAlerts
通过 api.alerts.Put
方法存入 alerts
存储结构中。这里的 api.alerts
是一个实现了 Alerts
接口的实例,通常是基于内存的 mem.Alerts
实现。在 mem.Alerts
的 Put
方法中,会为每个告警生成唯一的指纹(基于标签计算),并将告警存储到内部的 store.Alerts
结构(本质是一个 map[model.Fingerprint]*types.Alert
)中。如果存储过程中发生错误,记录错误日志并返回 500 Internal Server Error
响应给发送方,告知告警接收失败。若所有操作成功,则返回 200 OK
响应,表明告警已成功接收并存储。通过这一系列严谨的步骤,Alertmanager 实现了对告警的准确接收、校验和存储,为后续的告警处理流程提供了可靠的数据基础。
2. Dispatcher 分发告警
Dispatcher 在 Alertmanager 中承担着分发告警的重要职责,其核心逻辑在 dispatch/dispatch.go
文件中。Dispatcher 通过订阅 alerts
存储结构(通常是 mem.Alerts
)的 Subscribe
方法,获取到新的告警信息。其 Run
函数如下:
func (d *Dispatcher) Run() {
d.done = make(chan struct{})
d.mtx.Lock()
d.aggrGroupsPerRoute = map[*Route]map[model.Fingerprint]*aggrGroup{}
d.aggrGroupsNum = 0
d.metrics.aggrGroups.Set(0)
d.ctx, d.cancel = context.WithCancel(context.Background())
d.mtx.Unlock()
d.run(d.alerts.Subscribe())
close(d.done)
}
在 run
函数中,通过不断从 it.Next()
获取新的告警 alert
,并针对每个接收到的告警,查找匹配的路由规则:
func (d *Dispatcher) run(it provider.AlertIterator) {
maintenance := time.NewTicker(30 * time.Second)
defer maintenance.Stop()
defer it.Close()
for {
select {
case alert, ok := <-it.Next():
if !ok {
// Iterator exhausted for some reason.
if err := it.Err(); err != nil {
d.logger.Error("Error on alert update", "err", err)
}
return
}
d.logger.Debug("Received alert", "alert", alert)
// Log errors but keep trying.
if err := it.Err(); err != nil {
d.logger.Error("Error on alert update", "err", err)
continue
}
now := time.Now()
for _, r := range d.route.Match(alert.Labels) {
d.processAlert(alert, r)
}
d.metrics.processingDuration.Observe(time.Since(now).Seconds())
case <-maintenance.C:
d.doMaintenance()
case <-d.ctx.Done():
return
}
}
}
d.route.Match(alert.Labels)
函数会遍历路由树,查找与告警标签匹配的路由节点 r
。一旦找到匹配的路由节点,就调用 processAlert
函数对告警进行处理。
processAlert
函数的主要工作是将告警分配到对应的聚合组 aggrGroup
中,代码如下:
func (d *Dispatcher) processAlert(alert *types.Alert, route *Route) {
groupLabels := getGroupLabels(alert, route)
fp := groupLabels.Fingerprint()
d.mtx.Lock()
defer d.mtx.Unlock()
routeGroups, ok := d.aggrGroupsPerRoute[route]
if !ok {
routeGroups = map[model.Fingerprint]*aggrGroup{}
d.aggrGroupsPerRoute[route] = routeGroups
}
ag, ok := routeGroups[fp]
if ok {
ag.insert(alert)
return
}
// If the group does not exist, create it. But check the limit first.
if limit := d.limits.MaxNumberOfAggregationGroups(); limit > 0 && d.aggrGroupsNum >= limit {
d.metrics.aggrGroupLimitReached.Inc()
d.logger.Error("Too many aggregation groups, cannot create new group for alert", "groups", d.aggrGroupsNum, "limit", limit, "alert", alert.Name())
return
}
ag = newAggrGroup(d.ctx, groupLabels, route, d.timeout, d.logger)
routeGroups[fp] = ag
d.aggrGroupsNum++
d.metrics.aggrGroups.Inc()
// Insert the 1st alert in the group before starting the group's run()
// function, to make sure that when the run() will be executed the 1st
// alert is already there.
ag.insert(alert)
go ag.run(func(ctx context.Context, alerts ...*types.Alert) bool {
_, _, err := d.stage.Exec(ctx, d.logger, alerts...)
if err != nil {
logger := d.logger.With("num_alerts", len(alerts), "err", err)
if errors.Is(ctx.Err(), context.Canceled) {
// It is expected for the context to be canceled on
// configuration reload or shutdown. In this case, the
// message should only be logged at the debug level.
logger.Debug("Notify for alerts failed")
} else {
logger.Error("Notify for alerts failed")
}
}
return err == nil
})
}
首先,根据告警 alert
和匹配的路由 route
,通过 getGroupLabels
函数获取用于分组的标签 groupLabels
,并计算其指纹 fp
。然后,从 d.aggrGroupsPerRoute
中查找与该路由 route
对应的聚合组集合 routeGroups
。如果该集合不存在,则创建一个新的空集合。接着,在 routeGroups
中查找是否已存在指纹为 fp
的聚合组 ag
。若存在,直接将告警插入该聚合组 ag.insert(alert)
。若不存在,则创建一个新的聚合组 ag
,使用 newAggrGroup
函数,传入上下文 d.ctx
、分组标签 groupLabels
、路由 route
、超时设置 d.timeout
和日志记录器 d.logger
。创建完成后,将新的聚合组添加到 routeGroups
中,并增加聚合组的统计计数。最后,将告警插入新创建的聚合组,并启动一个新的协程 go ag.run(...)
,该协程会调用 d.stage.Exec
方法,将聚合组中的告警传递给后续的处理阶段(如 Notify
模块)进行处理。通过这样的流程,Dispatcher 实现了将告警准确地分发到相应的聚合组,为后续的告警处理和通知发送做好准备。
3. Notify 模块发送通知
Notify 模块负责将处理后的告警信息发送给相应的接收者,其核心处理流程在 notify/notify.go
文件中定义。Notify 模块通过构建一个 pipeline
来处理告警,pipeline
由多个阶段(Stage
)组成,每个阶段执行特定的操作。
PipelineBuilder
用于构建 pipeline
,其 New
函数如下:
func (pb *PipelineBuilder) New(
receivers map[string][]Integration,
wait func() time.Duration,
inhibitor *inhibit.Inhibitor,
silencer *silence.Silencer,
intervener *timeinterval.Intervener,
marker types.GroupMarker,
notificationLog NotificationLog,
peer Peer,
) RoutingStage {
rs := make(RoutingStage, len(receivers))
ms := NewGossipSettleStage(peer)
is := NewMuteStage(inhibitor, pb.metrics)
tas := NewTimeActiveStage(intervener, marker, pb.metrics)
tms := NewTimeMuteStage(intervener, marker, pb.metrics)
ss := NewMuteStage(silencer, pb.metrics)
for name := range receivers {
st := createReceiverStage(name, receivers[name], wait, notificationLog, pb.metrics)
rs[name] = MultiStage{ms, is, tas, tms, ss, st}
}
pb.metrics.InitializeFor(receivers)
return rs
}
在构建 pipeline
时,首先创建了一些通用的阶段,如 NewGossipSettleStage
用于处理集群节点间的状态同步(确保各个节点对告警处理的一致性),NewMuteStage
(分别基于 inhibitor
和 silencer
)用于实现告警抑制和静默功能,NewTimeMuteStage
和 NewTimeActiveStage
用于根据时间区间对告警进行静音或激活处理。然后,针对每个接收器 name
,通过 createReceiverStage
函数创建一个包含特定接收器处理流程的阶段 st
。这个阶段通常包含 Wait
(等待阶段,用于控制发送频率)、Dedup
(去重阶段,避免重复发送相同告警)、Retry
(重试阶段,确保告警能成功发送)等子阶段。最后,将这些阶段组合成一个 MultiStage
结构,并添加到 rs
中,形成完整的 pipeline
。
以 RetryStage
为例,其执行逻辑如下:
func (r RetryStage) exec(ctx context.Context, l *slog.Logger, alerts ...*types.Alert) (context.Context, []*types.Alert, error) {
var sent []*types.Alert
// If we shouldn't send notifications for resolved alerts, but there are only
// resolved alerts, report them all as successfully notified (we still want the // notification log to log them for the next run of DedupStage).
if !r.integration.SendResolved() {
firing, ok := FiringAlerts(ctx)
if !ok {
return ctx, nil, errors.New("firing alerts missing")
}
if len(firing) == 0 {
return ctx, alerts, nil
}
for _, a := range alerts {
if a.Status() != model.AlertResolved {
sent = append(sent, a)
}
}
} else {
sent = alerts
}
b := backoff.NewExponentialBackOff()
b.MaxElapsedTime = 0 // Always retry.
tick := backoff.NewTicker(b)
defer tick.Stop()
var (
i = 0
iErr error
)
l = l.With("receiver", r.groupName, "integration", r.integration.String())
if groupKey, ok := GroupKey(ctx); ok {
l = l.With("aggrGroup", groupKey)
}
for {
i++
// Always check the context first to not notify again.
select {
case <-ctx.Done():
if iErr == nil {
iErr = ctx.Err()
if errors.Is(iErr, context.Canceled) {
iErr = NewErrorWithReason(ContextCanceledReason, iErr)
} else if errors.Is(iErr, context.DeadlineExceeded) {
iErr = NewErrorWithReason(ContextDeadlineExceededReason, iErr)
}
}
if iErr != nil {
return ctx, nil, fmt.Errorf("%s/%s: notify retry canceled after %d attempts: %w", r.groupName, r.integration.String(), i, iErr)
}
return ctx, nil, nil
default:
}
select {
case <-tick.C:
now := time.Now()
retry, err := r.integration.Notify(ctx, sent...)
dur := time.Since(now)
r.metrics.notificationLatencySeconds.WithLabelValues(r.labelValues...).Observe(dur.Seconds())
r.metrics.numNotificationRequestsTotal.WithLabelValues(r.labelValues...).Inc()
if err != nil {
r.metrics.numNotificationRequestsFailedTotal.WithLabelValues(r.labelValues...).Inc()
if !retry {
return ctx, alerts, fmt.Errorf("%s/%s: notify retry canceled due to unrecoverable error after %d attempts: %w", r.groupName, r.integration.String(), i, err)
}
if ctx.Err() == nil {
if iErr == nil || err.Error() != iErr.Error() {
// Log the error if the context isn't done and the error isn't the same as before.
l.Warn("Notify attempt failed, will retry later", "attempts", i, "err", err)
}
// Save this error to be able to return the last seen error by an
// integration upon context timeout.
iErr = err
}
} else {
l := l.With("attempts", i, "duration", dur)
if i <= 1 {
l = l.With("alerts", fmt.Sprintf("%v", alerts))
l.Debug("Notify success")
} else {
l.Info("Notify success")
}
return ctx, alerts, nil
}
case <-ctx.Done():
if iErr == nil {
iErr = ctx.Err()
if errors.Is(iErr, context.Canceled) {
iErr = NewErrorWithReason(ContextCanceledReason, iErr)
} else if errors.Is(iErr, context.DeadlineExceeded) {
iErr = NewErrorWithReason(ContextDeadlineExceededReason, iErr)
}
}
if iErr != nil {
return ctx, nil, fmt.Errorf("%s/%s: notify retry canceled after %d attempts: %w", r.groupName, r.integration.String(), i, iErr)
}
return ctx, nil, nil
}
}
}
在 RetryStage
中,使用 backoff.NewExponentialBackOff
创建一个指数退避策略的重试机制。通过 backoff.NewTicker
启动一个定时器 tick
,定时尝试发送告警。每次尝试发送时,调用 r.integration.Notify(ctx, alerts...)
方法,通过具体的接收器(如电子邮件、Slack 等集成方式)发送告警。在整个过程中,如果上下文 ctx
被取消,且存在未处理的错误 iErr
,则返回错误信息。通过这样的 pipeline
处理流程,Notify 模块实现了对告警的去重、按策略发送和重试等功能,确保告警能够准确、可靠地发送到相应的接收者手中 。
五、最后
Alertmanager 作为监控体系中的关键组件,在告警处理方面展现出了强大的功能和高效的实现逻辑。通过对告警去重、分组、路由和抑制等核心功能的深入分析,以及对其源码中关键数据结构和处理流程的解读,我们清晰地认识到它如何在复杂的监控环境中,将海量的告警信息进行有序管理和精准分发。它不仅有效减少了告警噪音,提高了运维人员的工作效率,还确保了关键告警能够及时送达相关人员手中,为保障系统的稳定运行发挥了重要作用。