乔克
乔克
Published on 2025-02-06 / 52 Visits
0
0

使用DaemonSet实现heapdump文件自动化管理

✍ 道路千万条,安全第一条。操作不规范,运维两行泪。

一、引言

1、为什么要获取 heapdump 文件

heapdump 文件是 Java 应用遭遇 OOM 后的诊断报告,记录了某一时刻 JVM 堆中对象的详细使用情况,是 JVM 堆内存的一个快照。通过分析 heapdump 文件,我们可以深入了解到内存中究竟存在哪些对象,它们占用了多少内存空间,以及对象之间的引用关系如何。这对于定位内存泄漏问题至关重要。

2、为什么使用 DaemonSet 实现

之前在 SRE运维笔记公众号中看到一篇文章《运维救星!一键开启 k8s 微服务 OOM heapdump 自动化之旅》,其实现思路通过在应用容器中增加 dump 脚本,然后通过 java 参数 -XX:OnOutOfMemoryError 配置脚本,它的作用是当内存溢出的时候,会调用这个参数配置的脚本做一些后续处理,比如文章中的 dump 脚本,也可以是重启应用的脚本等。

上述方法对应用有一定的侵入性,另外,如果文件太大,会出现容器退出导致上传失败的情况。结合实际情况,准备使用 DaemonSet 部署一个 heapdump-watcher 应用,通过它来监听 heapdump.prof 文件实现自动化管理。

Tips:该方法仅适合将 heapdump.prof 持久化到 K8s 节点的场景。但是具有一定的参考意义。

3、实施前提

该方案需要以下前提:

  • heapdump.prof 文件持久化到 K8s 节点。
  • 持久化的目录具备相同规则,比如:/mnt/logs/<APP_NAME>/logs/heapdump.prof,如果需要避免冲突,目录可以改造成 /mnt/logs/<APP_NAME>/logs/<POD_NAME>heapdump.prof。
  • 具备阿里云 OSS 操作权限。
  • 具备一个可用的企业微信机器人

二、整体思路

image-wwzo.png

OOM事件触发通过 Java 启动参数配置,增加 -XX:+HeapDumpOnOutOfMemoryError -XX:HeapDumpPath=/mnt/logs/heapdump.hprof,当应用触发 OOM,则会在 /mnt/logs 目录下自动生成 heapdump.prof 文件。

我们通过 fsnotify 来监听文件的变化,当 heapdump.prof 生成完后,fsnotify 就会迅速捕捉到这个事件,我们通过阿里云OSS的SDK 实现文件上传,将 heapdump.prof 文件压缩后上传到阿里云OSS。为了节约节点磁盘空间,当 heapdump.prof 文件上传完成后清理本地文件。

为了让相关开发人员了解到新的 heapdump.prof 文件已经生成,我们通过企业微信机器人通知到对应的开发群。

三、具体实现

(1)初始化部分

func init() {  
    // 获取环境  
    env = getEnv("ENV", "prod")  
    var err error  
    watcher, err = fsnotify.NewWatcher()  
    if err != nil {  
       log.Fatalf("Failed to create fsnotify watcher: %v", err)  
    }  
  
    // 加载配置文件  
    config, err = loadConfig(configPath)  
    if err != nil {  
       log.Fatalf("Failed to load config: %v", err)  
    }  
  
    // 初始化OSS客户端  
    ossClient, err := oss.New(config.OSS.Endpoint, config.OSS.AccessID, config.OSS.AccessKey)  
    if err != nil {  
       log.Fatalf("Failed to create OSS client: %v", err)  
    }  
    client, _ = ossClient.Bucket(config.OSS.Bucket)  
  
    if config.WatchPods {  
       // 初始化Kubernetes客户端  
       kubeClient, err = createKubeClient()  
       if err != nil {  
          log.Fatalf("Failed to create Kubernetes client: %v", err)  
       }  
  
       // 获取当前节点的IP  
       nodeIP, err = getNodeIP()  
       if err != nil {  
          log.Fatalf("Failed to get node IP: %v", err)  
       }  
    }  
  
    // 初始化信号通道  
    signalChan = make(chan os.Signal, 1)  
    stopChan = make(chan struct{})  
    signal.Notify(signalChan, syscall.SIGINT, syscall.SIGTERM)  
}

在这段初始化代码中,首先通过 getEnv 函数获取环境变量 ENV 的值,如果未设置则默认为 prod。接着创建一个 fsnotify.Watcher,用于监听文件系统的变化。然后从指定路径 configPath 加载配置文件,配置文件中包含了 OSS、企业微信 Webhook 以及白名单等相关配置信息。

随后,利用配置信息初始化阿里云 OSS 客户端,通过提供的 Endpoint、AccessID 和 AccessKey 创建 ossClient,并获取指定的 Bucket,以便后续进行文件上传操作。

如果配置中 WatchPods 字段为 true,表示会监听 Pod 的变化(因为 Pod 会重建,如果日志目录包含 POD_NAME,重建后就不应该再监听原来 Pod 目录),则会初始化 Kubernetes 客户端。通过 createKubeClient 函数创建 kubeClient,用于与 Kubernetes 集群进行交互。还会获取当前节点的 IP 地址,以便后续监听该节点上的 Pod 变化。

最后,初始化两个通道 signalChanstopChansignalChan 用于接收操作系统发送的信号,如 SIGINT(中断信号,通常由用户按下 Ctrl+C 触发)和 SIGTERM(终止信号,用于正常终止进程),以便程序能够在接收到这些信号时进行优雅退出;stopChan 则用于停止 Informer,当程序接收到终止信号时,通过关闭 stopChan 来通知 Informer 停止工作。

(2)文件监听

func watchFiles() {  
    for {  
       select {  
       case event, ok := <-watcher.Events:  
          if !ok {  
             return  
          }  
          if event.Op&fsnotify.Create == fsnotify.Create {  
             // 检测到新文件创建  
             if strings.HasSuffix(event.Name, "heapdump.prof") {  
                log.Printf("New heapdump file detected: %s", event.Name)  
                // 等待文件写入完成  
                if err := waitForFileCompletion(event.Name); err != nil {  
                   log.Printf("Failed to wait for file completion: %v", err)  
                   continue  
                }  
                // 上传文件到OSS  
                appName := filepath.Base(filepath.Dir(filepath.Dir(event.Name)))  
                err := uploadFileToOSS(event.Name, appName)  
                if err != nil {  
                   log.Printf("Failed to upload file to OSS: %v", err)  
                } else {  
                   log.Printf("File uploaded to OSS successfully: %s", event.Name)  
                   // 发送企业微信告警通知  
                   err = sendWechatAlert(appName)  
                   if err != nil {  
                      log.Printf("Failed to send WeChat alert: %v", err)  
                   }  
                }  
             }  
          }  
       case err, ok := <-watcher.Errors:  
          if !ok {  
             return  
          }  
          log.Printf("Error: %v", err)  
       }  
    }  
}

watchFiles 函数是实现文件监听的核心部分。它通过一个无限循环 for { }select 语句来监听 watcher.Events 通道和 watcher.Errors 通道。

watcher.Events 通道有事件发生时,会检查事件类型是否为文件创建(event.Op&fsnotify.Create == fsnotify.Create)。如果是新文件创建,且文件后缀为 heapdump.prof,则表示检测到了新的 heapdump 文件。

此时,会调用 waitForFileCompletion 函数等待文件写入完成。该函数通过不断检查文件大小是否变化来判断文件是否写入完成,设置了最大检查时长为 30 秒,检查间隔为 2 秒。如果文件在规定时间内大小不再变化,则认为文件写入完成;否则,返回错误并继续监听下一个事件。

文件写入完成后,获取文件所在目录的应用名称,然后调用 uploadFileToOSS 函数将文件上传到 OSS。上传成功后,会调用 sendWechatAlert 函数发送企业微信告警通知,告知相关人员新的 heapdump 文件已生成并上传。

(3)Pod 状态监听

该方法主要是针对 heapdump.prof 所存放的目录有 POD_NAME 变量,希望实现的是当原 Pod 销毁会取消监听原 Pod 目录,当新 Pod 创建会监听新 Pod 目录。

func watchPods() {  
    // 获取当前节点上的Pod列表  
    for _, appName := range config.Whitelist {  
       pods, err := kubeClient.CoreV1().Pods(metav1.NamespaceAll).List(context.TODO(), metav1.ListOptions{  
          LabelSelector: fmt.Sprintf("app=%s", appName),  
          FieldSelector: fmt.Sprintf("spec.nodeName=%s", nodeIP),  
       })  
       if err != nil {  
          log.Printf("Failed to list pods for app %s: %v", appName, err)  
          continue  
       }  
  
       for _, pod := range pods.Items {  
          addPodWatch(appName, pod.Name)  
       }  
    }  
  
    // 监听Pod变化  
    _, controller := cache.NewInformer(  
       &cache.ListWatch{  
          ListFunc: func(options metav1.ListOptions) (runtime.Object, error) {  
             options.FieldSelector = fmt.Sprintf("spec.nodeName=%s", nodeIP)  
             return kubeClient.CoreV1().Pods(metav1.NamespaceAll).List(context.TODO(), options)  
          },  
          WatchFunc: func(options metav1.ListOptions) (watch.Interface, error) {  
             options.FieldSelector = fmt.Sprintf("spec.nodeName=%s", nodeIP)  
             return kubeClient.CoreV1().Pods(metav1.NamespaceAll).Watch(context.TODO(), options)  
          },  
       },  
       &corev1.Pod{},  
       0,  
       cache.ResourceEventHandlerFuncs{  
          AddFunc: func(obj interface{}) {  
             pod := obj.(*corev1.Pod)  
             appName := pod.Labels["app"]  
             if isWhitelisted(appName) {  
                log.Printf("Pod added: %s/%s", pod.Namespace, pod.Name)  
                addPodWatch(appName, pod.Name)  
             }  
          },  
          DeleteFunc: func(obj interface{}) {  
             pod := obj.(*corev1.Pod)  
             appName := pod.Labels["app"]  
             if isWhitelisted(appName) {  
                log.Printf("Pod deleted: %s/%s", pod.Namespace, pod.Name)  
                removePodWatch(appName, pod.Name)  
             }  
          },  
       },  
    )  
    controller.Run(stopChan) // 使用 stopChan 来停止 Informer}

watchPods 函数负责监听 Pod 的变化。首先,遍历配置中的白名单应用名称,通过 Kubernetes 客户端 kubeClient 获取当前节点上属于这些应用的 Pod 列表。使用 LabelSelector 来筛选出特定应用的 Pod,FieldSelector 来指定只获取当前节点上的 Pod。

对于获取到的每个 Pod,调用 addPodWatch 函数为其添加文件监听。addPodWatch 函数会根据应用名称和 Pod 名称构建日志目录路径,并使用 watcher.Add 方法将该目录添加到文件监听列表中,以便后续能及时监听到该 Pod 生成的 heapdump 文件。

然后,通过 cache.NewInformer 创建一个 Informer,用于监听 Pod 的变化。Informer 是 Kubernetes 客户端中的一个重要组件,它通过 ListWatch 机制定期从 Kubernetes API Server 获取 Pod 列表,并监听 Pod 的变化事件。

ListFuncWatchFunc 分别定义了获取 Pod 列表和监听 Pod 变化的方法,都通过 kubeClient.CoreV1().Pods(metav1.NamespaceAll) 来操作所有命名空间下的 Pod,并根据当前节点 IP 进行筛选。

ResourceEventHandlerFuncs 定义了 Informer 在接收到 Pod 添加和删除事件时的处理逻辑。当有新 Pod 添加时,如果该 Pod 的应用名称在白名单中,会调用 addPodWatch 函数为其添加文件监听;当有 Pod 被删除时,如果应用名称在白名单中,会调用 removePodWatch 函数移除对该 Pod 的文件监听。

最后,启动 Informer 并传入 stopChan,当 stopChan 被关闭时,Informer 会停止运行,实现了优雅停止的功能。

(4)文件上传

func uploadFileToOSS(filePath string, appName string) error {  
    file, err := os.Open(filePath)  
    if err != nil {  
       return err  
    }  
    defer file.Close()  
  
    // 创建临时文件用于存储压缩后的文件  
    tempFile, err := os.CreateTemp("", "heapdump-*.zip")  
    if err != nil {  
       return err  
    }  
    defer tempFile.Close()  
    defer os.Remove(tempFile.Name()) // 删除临时文件  
  
    // 创建 zip.Writer    
    zipWriter := zip.NewWriter(tempFile)  
    defer zipWriter.Close()  
  
    // 添加文件到 zip    
    zipFileWriter, err := zipWriter.Create(filepath.Base(filePath))  
    if err != nil {  
       return err  
    }  
    _, err = io.Copy(zipFileWriter, file)  
    if err != nil {  
       return err  
    }  
  
    // 确保 zip 文件写入完成  
    err = zipWriter.Close()  
    if err != nil {  
       return err  
    }  
  
    // 重新打开临时文件用于上传  
    tempFile.Seek(0, 0)  
    tempFileReader := io.Reader(tempFile)  
  
    // 构建上传路径  
    timestamp := time.Now().Format("20060102150405")  
    objectName := fmt.Sprintf("heapdump/%s/heapdump_%s.zip", appName, timestamp)  
  
    // 设置文件元数据
    expires := time.Now().Add(24 * time.Hour) // 设置过期时间为24小时后  
    options := []oss.Option{  
       oss.Expires(expires),  
    }  
  
    err = client.PutObject(objectName, tempFileReader, options...)  
    if err != nil {  
       return err  
    }  
  
    // 生成预签名URL  
    ossURL, err = client.SignURL(objectName, oss.HTTPGet, expires.Unix()-time.Now().Unix())  
    if err != nil {  
       log.Fatalf("Failed to generate presigned URL: %v", err)  
    }  
  
    // 文件上传成功后,删除本地文件  
    log.Printf("Deleting local file: %s", filePath)  
    if err := os.Remove(filePath); err != nil {  
       log.Printf("Failed to delete local file: %v", err)  
    }  
    return nil  
}

这一步先将 heapdump.prof 进行 zip 压缩,然后再将其上传到 OSS,上传成功后删除本地文件。

(5)发送通知

func sendWechatAlert(appName string) error {  
    // 构建 Markdown 格式的消息  
    markdownContent := fmt.Sprintf(`# JAVA OOM DUMP 文件生成  
> 应用:%s  
> 环境:%s  
> 文件:[下载地址](%s)  
> *Tips*: 文件只保留1天,请及时下载`, appName, env, ossURL)  
  
    payload := map[string]interface{}{  
       "msgtype": "markdown",  
       "markdown": map[string]string{  
          "content": markdownContent,  
       },  
    }  
  
    _, body, errs := gorequest.New().Post(config.Wechat.WebhookURL).Send(payload).End()  
    if errs != nil {  
       return fmt.Errorf("failed to send WeChat alert: %v", errs)  
    }  
    log.Printf("WeChat alert response: %s", body)  
    return nil  
}

该步骤将产生 heapdump 的信息发送到对应的告警群。

四、部署验证

(1)制作镜像

将应用打包成 Docker 镜像。

FROM golang:1.21-alpine AS builder  
WORKDIR /app  
COPY go.mod go.sum ./  
RUN go mod download  
COPY . .  
RUN CGO_ENABLED=0 GOOS=linux go build -o /heapdump-watcher  
  
FROM alpine:3.18  
RUN apk add --no-cache ca-certificates  
WORKDIR /app  
COPY --from=builder /heapdump-watcher ./heapdump-watcher  
CMD ["/heapdump-watcher"]

(2)在 K8s 中部署应用

apiVersion: v1  
kind: ServiceAccount  
metadata:  
  name: heapdump-watcher  
  namespace: default  
---  
apiVersion: rbac.authorization.k8s.io/v1  
kind: Role  
metadata:  
  namespace: default  
  name: heapdump-watcher-role  
rules:  
  - apiGroups: [""]  
    resources: ["pods"]  
    verbs: ["get", "list", "watch"]  
---  
apiVersion: v1  
kind: ConfigMap  
metadata:  
  name: heapdump-config  
  namespace: default  
data:  
  config.yaml: |  
    oss:  
      endpoint: your-oss-endpoint  
      bucket: your-oss-bucket  
      accessID: your-oss-access-id  
      accessKey: your-oss-access-key  
  
    wechat:  
      webhookURL: your-wechat-webhook-url  
  
    whitelist:  
      - app1  
      - app2  
      - app3  
    
    watchPods: false  # 控制是否监听 Pod 变化
---  
apiVersion: apps/v1  
kind: DaemonSet  
metadata:  
  name: heapdump-watcher  
  namespace: default  
spec:  
  selector:  
    matchLabels:  
      app: heapdump-watcher  
  template:  
    metadata:  
      labels:  
        app: heapdump-watcher  
    spec:  
      serviceAccountName: heapdump-watcher  
      containers:  
        - name: heapdump-watcher  
          image: your-docker-image:latest  
          volumeMounts:  
            - name: logs  
              mountPath: /mnt/logs  
              readOnly: false  
            - name: config  
              mountPath: /app/config.yaml  
              subPath: config.yaml  
              readOnly: true  
          env:  
            - name: NODE_NAME  
              valueFrom:  
                fieldRef:  
                  fieldPath: spec.nodeName  
            - name: ENV  
              value: prod  
      volumes:  
        - name: logs  
          hostPath:  
            path: /mnt/logs  
            type: Directory  
        - name: config  
          configMap:  
            name: heapdump-config  
            items:  
              - key: config.yaml  
                path: config.yaml

(3)验证

当应用产生告警后会通知到对应的企业微信,如下:

image-tnqd.png

五、最后

当前功能已经初步实现,但仍有许多可以优化和扩展的方向。可以考虑扩展支持更多类型的云存储,如腾讯云 COS、AWS S3 等,以满足不同用户的需求。这样一来,用户可以根据自己的实际情况和偏好,选择最适合自己的云存储服务,提高方案的通用性和灵活性。

另外在通知内容和方式上,可以进一步丰富通知内容,不仅包含应用名称、环境和文件下载链接,还可以增加更多关于内存问题的详细信息,如内存使用峰值、OOM 发生的时间点等。在通知方式上,可以增加对其他通信工具的支持,如钉钉、飞书等,让用户能够根据自己团队的使用习惯选择合适的通知方式,确保通知能够及时、准确地传达给相关人员。

还可以引入更智能的分析功能,在上传 heapdump 文件后,自动对文件进行初步分析,提取关键信息,如内存泄漏的疑似对象、内存占用过高的类等,并将分析结果一并通知给相关人员。这样可以帮助开发人员更快地定位问题,提高问题解决的效率,为 Java 应用的稳定运行提供更强大的支持。


Comment