✍ 道路千万条,安全第一条。操作不规范,运维两行泪。
一、引言
1、为什么要获取 heapdump 文件
heapdump
文件是 Java
应用遭遇 OOM
后的诊断报告,记录了某一时刻 JVM 堆中对象的详细使用情况,是 JVM 堆内存的一个快照。通过分析 heapdump 文件,我们可以深入了解到内存中究竟存在哪些对象,它们占用了多少内存空间,以及对象之间的引用关系如何。这对于定位内存泄漏问题至关重要。
2、为什么使用 DaemonSet 实现
之前在 SRE运维笔记
公众号中看到一篇文章《运维救星!一键开启 k8s 微服务 OOM heapdump 自动化之旅》,其实现思路通过在应用容器中增加 dump 脚本,然后通过 java 参数 -XX:OnOutOfMemoryError
配置脚本,它的作用是当内存溢出的时候,会调用这个参数配置的脚本做一些后续处理,比如文章中的 dump 脚本,也可以是重启应用的脚本等。
上述方法对应用有一定的侵入性,另外,如果文件太大,会出现容器退出导致上传失败的情况。结合实际情况,准备使用 DaemonSet 部署一个 heapdump-watcher
应用,通过它来监听 heapdump.prof
文件实现自动化管理。
Tips:该方法仅适合将
heapdump.prof
持久化到K8s
节点的场景。但是具有一定的参考意义。
3、实施前提
该方案需要以下前提:
heapdump.prof
文件持久化到K8s
节点。- 持久化的目录具备相同规则,比如:/mnt/logs/<APP_NAME>/logs/heapdump.prof,如果需要避免冲突,目录可以改造成 /mnt/logs/<APP_NAME>/logs/<POD_NAME>heapdump.prof。
- 具备阿里云 OSS 操作权限。
- 具备一个可用的
企业微信机器人
。
二、整体思路
OOM事件触发
通过 Java 启动参数配置,增加 -XX:+HeapDumpOnOutOfMemoryError -XX:HeapDumpPath=/mnt/logs/heapdump.hprof
,当应用触发 OOM
,则会在 /mnt/logs
目录下自动生成 heapdump.prof
文件。
我们通过 fsnotify
来监听文件的变化,当 heapdump.prof
生成完后,fsnotify 就会迅速捕捉到这个事件,我们通过阿里云OSS的SDK
实现文件上传,将 heapdump.prof
文件压缩后上传到阿里云OSS
。为了节约节点磁盘空间,当 heapdump.prof
文件上传完成后清理本地文件。
为了让相关开发人员了解到新的 heapdump.prof
文件已经生成,我们通过企业微信机器人
通知到对应的开发群。
三、具体实现
(1)初始化部分
func init() {
// 获取环境
env = getEnv("ENV", "prod")
var err error
watcher, err = fsnotify.NewWatcher()
if err != nil {
log.Fatalf("Failed to create fsnotify watcher: %v", err)
}
// 加载配置文件
config, err = loadConfig(configPath)
if err != nil {
log.Fatalf("Failed to load config: %v", err)
}
// 初始化OSS客户端
ossClient, err := oss.New(config.OSS.Endpoint, config.OSS.AccessID, config.OSS.AccessKey)
if err != nil {
log.Fatalf("Failed to create OSS client: %v", err)
}
client, _ = ossClient.Bucket(config.OSS.Bucket)
if config.WatchPods {
// 初始化Kubernetes客户端
kubeClient, err = createKubeClient()
if err != nil {
log.Fatalf("Failed to create Kubernetes client: %v", err)
}
// 获取当前节点的IP
nodeIP, err = getNodeIP()
if err != nil {
log.Fatalf("Failed to get node IP: %v", err)
}
}
// 初始化信号通道
signalChan = make(chan os.Signal, 1)
stopChan = make(chan struct{})
signal.Notify(signalChan, syscall.SIGINT, syscall.SIGTERM)
}
在这段初始化代码中,首先通过 getEnv
函数获取环境变量 ENV
的值,如果未设置则默认为 prod
。接着创建一个 fsnotify.Watcher
,用于监听文件系统的变化。然后从指定路径 configPath
加载配置文件,配置文件中包含了 OSS、企业微信 Webhook 以及白名单等相关配置信息。
随后,利用配置信息初始化阿里云 OSS 客户端,通过提供的 Endpoint、AccessID 和 AccessKey 创建 ossClient,并获取指定的 Bucket,以便后续进行文件上传操作。
如果配置中 WatchPods
字段为 true
,表示会监听 Pod 的变化(因为 Pod 会重建,如果日志目录包含 POD_NAME
,重建后就不应该再监听原来 Pod 目录),则会初始化 Kubernetes
客户端。通过 createKubeClient
函数创建 kubeClient
,用于与 Kubernetes 集群进行交互。还会获取当前节点的 IP 地址,以便后续监听该节点上的 Pod 变化。
最后,初始化两个通道 signalChan
和 stopChan
。signalChan
用于接收操作系统发送的信号,如 SIGINT(中断信号,通常由用户按下 Ctrl+C 触发)和 SIGTERM(终止信号,用于正常终止进程),以便程序能够在接收到这些信号时进行优雅退出;stopChan
则用于停止 Informer,当程序接收到终止信号时,通过关闭 stopChan 来通知 Informer 停止工作。
(2)文件监听
func watchFiles() {
for {
select {
case event, ok := <-watcher.Events:
if !ok {
return
}
if event.Op&fsnotify.Create == fsnotify.Create {
// 检测到新文件创建
if strings.HasSuffix(event.Name, "heapdump.prof") {
log.Printf("New heapdump file detected: %s", event.Name)
// 等待文件写入完成
if err := waitForFileCompletion(event.Name); err != nil {
log.Printf("Failed to wait for file completion: %v", err)
continue
}
// 上传文件到OSS
appName := filepath.Base(filepath.Dir(filepath.Dir(event.Name)))
err := uploadFileToOSS(event.Name, appName)
if err != nil {
log.Printf("Failed to upload file to OSS: %v", err)
} else {
log.Printf("File uploaded to OSS successfully: %s", event.Name)
// 发送企业微信告警通知
err = sendWechatAlert(appName)
if err != nil {
log.Printf("Failed to send WeChat alert: %v", err)
}
}
}
}
case err, ok := <-watcher.Errors:
if !ok {
return
}
log.Printf("Error: %v", err)
}
}
}
watchFiles
函数是实现文件监听的核心部分。它通过一个无限循环 for { }
和 select
语句来监听 watcher.Events
通道和 watcher.Errors
通道。
当 watcher.Events
通道有事件发生时,会检查事件类型是否为文件创建(event.Op&fsnotify.Create == fsnotify.Create
)。如果是新文件创建,且文件后缀为 heapdump.prof
,则表示检测到了新的 heapdump 文件。
此时,会调用 waitForFileCompletion
函数等待文件写入完成。该函数通过不断检查文件大小是否变化来判断文件是否写入完成,设置了最大检查时长为 30 秒,检查间隔为 2 秒。如果文件在规定时间内大小不再变化,则认为文件写入完成;否则,返回错误并继续监听下一个事件。
文件写入完成后,获取文件所在目录的应用名称,然后调用 uploadFileToOSS
函数将文件上传到 OSS。上传成功后,会调用 sendWechatAlert
函数发送企业微信告警通知,告知相关人员新的 heapdump 文件已生成并上传。
(3)Pod 状态监听
该方法主要是针对 heapdump.prof
所存放的目录有 POD_NAME
变量,希望实现的是当原 Pod
销毁会取消监听原 Pod
目录,当新 Pod
创建会监听新 Pod
目录。
func watchPods() {
// 获取当前节点上的Pod列表
for _, appName := range config.Whitelist {
pods, err := kubeClient.CoreV1().Pods(metav1.NamespaceAll).List(context.TODO(), metav1.ListOptions{
LabelSelector: fmt.Sprintf("app=%s", appName),
FieldSelector: fmt.Sprintf("spec.nodeName=%s", nodeIP),
})
if err != nil {
log.Printf("Failed to list pods for app %s: %v", appName, err)
continue
}
for _, pod := range pods.Items {
addPodWatch(appName, pod.Name)
}
}
// 监听Pod变化
_, controller := cache.NewInformer(
&cache.ListWatch{
ListFunc: func(options metav1.ListOptions) (runtime.Object, error) {
options.FieldSelector = fmt.Sprintf("spec.nodeName=%s", nodeIP)
return kubeClient.CoreV1().Pods(metav1.NamespaceAll).List(context.TODO(), options)
},
WatchFunc: func(options metav1.ListOptions) (watch.Interface, error) {
options.FieldSelector = fmt.Sprintf("spec.nodeName=%s", nodeIP)
return kubeClient.CoreV1().Pods(metav1.NamespaceAll).Watch(context.TODO(), options)
},
},
&corev1.Pod{},
0,
cache.ResourceEventHandlerFuncs{
AddFunc: func(obj interface{}) {
pod := obj.(*corev1.Pod)
appName := pod.Labels["app"]
if isWhitelisted(appName) {
log.Printf("Pod added: %s/%s", pod.Namespace, pod.Name)
addPodWatch(appName, pod.Name)
}
},
DeleteFunc: func(obj interface{}) {
pod := obj.(*corev1.Pod)
appName := pod.Labels["app"]
if isWhitelisted(appName) {
log.Printf("Pod deleted: %s/%s", pod.Namespace, pod.Name)
removePodWatch(appName, pod.Name)
}
},
},
)
controller.Run(stopChan) // 使用 stopChan 来停止 Informer}
watchPods
函数负责监听 Pod 的变化。首先,遍历配置中的白名单应用名称,通过 Kubernetes 客户端 kubeClient
获取当前节点上属于这些应用的 Pod 列表。使用 LabelSelector
来筛选出特定应用的 Pod,FieldSelector
来指定只获取当前节点上的 Pod。
对于获取到的每个 Pod,调用 addPodWatch
函数为其添加文件监听。addPodWatch
函数会根据应用名称和 Pod 名称构建日志目录路径,并使用 watcher.Add
方法将该目录添加到文件监听列表中,以便后续能及时监听到该 Pod 生成的 heapdump 文件。
然后,通过 cache.NewInformer
创建一个 Informer,用于监听 Pod 的变化。Informer
是 Kubernetes 客户端中的一个重要组件,它通过 ListWatch
机制定期从 Kubernetes API Server 获取 Pod 列表,并监听 Pod 的变化事件。
ListFunc
和 WatchFunc
分别定义了获取 Pod 列表和监听 Pod 变化的方法,都通过 kubeClient.CoreV1().Pods(metav1.NamespaceAll)
来操作所有命名空间下的 Pod,并根据当前节点 IP 进行筛选。
ResourceEventHandlerFuncs
定义了 Informer 在接收到 Pod 添加和删除事件时的处理逻辑。当有新 Pod 添加时,如果该 Pod 的应用名称在白名单中,会调用 addPodWatch
函数为其添加文件监听;当有 Pod 被删除时,如果应用名称在白名单中,会调用 removePodWatch
函数移除对该 Pod 的文件监听。
最后,启动 Informer 并传入 stopChan
,当 stopChan
被关闭时,Informer 会停止运行,实现了优雅停止的功能。
(4)文件上传
func uploadFileToOSS(filePath string, appName string) error {
file, err := os.Open(filePath)
if err != nil {
return err
}
defer file.Close()
// 创建临时文件用于存储压缩后的文件
tempFile, err := os.CreateTemp("", "heapdump-*.zip")
if err != nil {
return err
}
defer tempFile.Close()
defer os.Remove(tempFile.Name()) // 删除临时文件
// 创建 zip.Writer
zipWriter := zip.NewWriter(tempFile)
defer zipWriter.Close()
// 添加文件到 zip
zipFileWriter, err := zipWriter.Create(filepath.Base(filePath))
if err != nil {
return err
}
_, err = io.Copy(zipFileWriter, file)
if err != nil {
return err
}
// 确保 zip 文件写入完成
err = zipWriter.Close()
if err != nil {
return err
}
// 重新打开临时文件用于上传
tempFile.Seek(0, 0)
tempFileReader := io.Reader(tempFile)
// 构建上传路径
timestamp := time.Now().Format("20060102150405")
objectName := fmt.Sprintf("heapdump/%s/heapdump_%s.zip", appName, timestamp)
// 设置文件元数据
expires := time.Now().Add(24 * time.Hour) // 设置过期时间为24小时后
options := []oss.Option{
oss.Expires(expires),
}
err = client.PutObject(objectName, tempFileReader, options...)
if err != nil {
return err
}
// 生成预签名URL
ossURL, err = client.SignURL(objectName, oss.HTTPGet, expires.Unix()-time.Now().Unix())
if err != nil {
log.Fatalf("Failed to generate presigned URL: %v", err)
}
// 文件上传成功后,删除本地文件
log.Printf("Deleting local file: %s", filePath)
if err := os.Remove(filePath); err != nil {
log.Printf("Failed to delete local file: %v", err)
}
return nil
}
这一步先将 heapdump.prof
进行 zip
压缩,然后再将其上传到 OSS
,上传成功后删除本地文件。
(5)发送通知
func sendWechatAlert(appName string) error {
// 构建 Markdown 格式的消息
markdownContent := fmt.Sprintf(`# JAVA OOM DUMP 文件生成
> 应用:%s
> 环境:%s
> 文件:[下载地址](%s)
> *Tips*: 文件只保留1天,请及时下载`, appName, env, ossURL)
payload := map[string]interface{}{
"msgtype": "markdown",
"markdown": map[string]string{
"content": markdownContent,
},
}
_, body, errs := gorequest.New().Post(config.Wechat.WebhookURL).Send(payload).End()
if errs != nil {
return fmt.Errorf("failed to send WeChat alert: %v", errs)
}
log.Printf("WeChat alert response: %s", body)
return nil
}
该步骤将产生 heapdump
的信息发送到对应的告警群。
四、部署验证
(1)制作镜像
将应用打包成 Docker
镜像。
FROM golang:1.21-alpine AS builder
WORKDIR /app
COPY go.mod go.sum ./
RUN go mod download
COPY . .
RUN CGO_ENABLED=0 GOOS=linux go build -o /heapdump-watcher
FROM alpine:3.18
RUN apk add --no-cache ca-certificates
WORKDIR /app
COPY --from=builder /heapdump-watcher ./heapdump-watcher
CMD ["/heapdump-watcher"]
(2)在 K8s 中部署应用
apiVersion: v1
kind: ServiceAccount
metadata:
name: heapdump-watcher
namespace: default
---
apiVersion: rbac.authorization.k8s.io/v1
kind: Role
metadata:
namespace: default
name: heapdump-watcher-role
rules:
- apiGroups: [""]
resources: ["pods"]
verbs: ["get", "list", "watch"]
---
apiVersion: v1
kind: ConfigMap
metadata:
name: heapdump-config
namespace: default
data:
config.yaml: |
oss:
endpoint: your-oss-endpoint
bucket: your-oss-bucket
accessID: your-oss-access-id
accessKey: your-oss-access-key
wechat:
webhookURL: your-wechat-webhook-url
whitelist:
- app1
- app2
- app3
watchPods: false # 控制是否监听 Pod 变化
---
apiVersion: apps/v1
kind: DaemonSet
metadata:
name: heapdump-watcher
namespace: default
spec:
selector:
matchLabels:
app: heapdump-watcher
template:
metadata:
labels:
app: heapdump-watcher
spec:
serviceAccountName: heapdump-watcher
containers:
- name: heapdump-watcher
image: your-docker-image:latest
volumeMounts:
- name: logs
mountPath: /mnt/logs
readOnly: false
- name: config
mountPath: /app/config.yaml
subPath: config.yaml
readOnly: true
env:
- name: NODE_NAME
valueFrom:
fieldRef:
fieldPath: spec.nodeName
- name: ENV
value: prod
volumes:
- name: logs
hostPath:
path: /mnt/logs
type: Directory
- name: config
configMap:
name: heapdump-config
items:
- key: config.yaml
path: config.yaml
(3)验证
当应用产生告警后会通知到对应的企业微信,如下:
五、最后
当前功能已经初步实现,但仍有许多可以优化和扩展的方向。可以考虑扩展支持更多类型的云存储,如腾讯云 COS、AWS S3 等,以满足不同用户的需求。这样一来,用户可以根据自己的实际情况和偏好,选择最适合自己的云存储服务,提高方案的通用性和灵活性。
另外在通知内容和方式上,可以进一步丰富通知内容,不仅包含应用名称、环境和文件下载链接,还可以增加更多关于内存问题的详细信息,如内存使用峰值、OOM 发生的时间点等。在通知方式上,可以增加对其他通信工具的支持,如钉钉、飞书等,让用户能够根据自己团队的使用习惯选择合适的通知方式,确保通知能够及时、准确地传达给相关人员。
还可以引入更智能的分析功能,在上传 heapdump 文件后,自动对文件进行初步分析,提取关键信息,如内存泄漏的疑似对象、内存占用过高的类等,并将分析结果一并通知给相关人员。这样可以帮助开发人员更快地定位问题,提高问题解决的效率,为 Java 应用的稳定运行提供更强大的支持。