开发 K8s GPT 故障诊断工具 原创
✍ 道路千万条,安全第一条。操作不规范,运维两行泪。
前面我们介绍了 《开发 K8s Chat 命令行工具》,实现了通过和 Kubernetes 进行交互的方式进行运维,虽然文章中所描述的功能比较简单,但是可以以此进行扩展,丰富功能。
那本章,我们将在 《开发 K8s Chat 命令行工具》的基础之上,增加 Kubernetes 故障诊断工具,其主要功能点是:
- 获取集群的 event 时间,特别关注 warning 级别事件
- 然后进入对应的 pod 获取日志
- 大模型结合事件和日志进行分析,得出解决问题的建议
当然,这里也只是起到一个抛砖引玉的作用,提供简单的思路,可以自行扩展。
开发过程
(1)首先使用 cobra-cli
新增一个 analyze
命令
cobra-cli add analyze
(2)然后在 analyze
下面添加一个子命令 event
,专门用于分析事件
cobra-cli add event -p 'analyzeCmd'
(3)设计一个方法 getPodEventsAndLogs
用于获取 K8s 的事件和日志
// int64Ptr 辅助函数,用于创建 int64 指针
func int64Ptr(i int64) *int64 {
return &i
}
func getPodEventsAndLogs() (map[string][]string, error) {
// 创建 Kubernetes 客户端
clientGo, err := utils.NewClientGo(kubeconfig)
if err != nil {
return nil, fmt.Errorf("创建 Kubernetes 客户端失败: %v", err)
}
// 存储每个 Pod 的事件和日志信息
result := make(map[string][]string)
processedPods := make(map[string]bool) // 避免重复处理同一个 Pod
// 获取 Warning 级别的事件
events, err := clientGo.Clientset.CoreV1().Events("").List(context.TODO(), metav1.ListOptions{
FieldSelector: "type=Warning",
})
if err != nil {
return nil, fmt.Errorf("获取事件失败: %v", err)
}
for _, event := range events.Items {
// 只处理 Pod 相关的事件
if event.InvolvedObject.Kind != "Pod" {
continue
}
podName := event.InvolvedObject.Name
namespace := event.InvolvedObject.Namespace
message := event.Message
podKey := fmt.Sprintf("%s/%s", namespace, podName)
// 避免重复处理同一个 Pod
if processedPods[podKey] {
// 如果已经处理过这个 Pod,只添加新的事件信息
result[podKey] = append(result[podKey], fmt.Sprintf("Additional Event: %s", message))
continue
}
// 标记为已处理
processedPods[podKey] = true
// 获取 Pod 日志(添加限制和更好的错误处理)
logOptions := &corev1.PodLogOptions{
TailLines: int64Ptr(100), // 限制日志行数为最后100行
LimitBytes: int64Ptr(1024 * 1024), // 限制日志大小为1MB
}
req := clientGo.Clientset.CoreV1().Pods(namespace).GetLogs(podName, logOptions)
podLogs, err := req.Stream(context.TODO())
if err != nil {
// 即使获取日志失败,也保存事件信息
result[podKey] = append(result[podKey], fmt.Sprintf("Event Message: %s", message))
result[podKey] = append(result[podKey], fmt.Sprintf("Namespace: %s", namespace))
result[podKey] = append(result[podKey], fmt.Sprintf("日志获取失败: %v", err))
continue
}
// 使用匿名函数确保资源及时释放
func() {
defer podLogs.Close()
buf := new(bytes.Buffer)
_, err = buf.ReadFrom(podLogs)
if err != nil {
// 日志读取失败,但仍保存事件信息
result[podKey] = append(result[podKey], fmt.Sprintf("Event Message: %s", message))
result[podKey] = append(result[podKey], fmt.Sprintf("Namespace: %s", namespace))
result[podKey] = append(result[podKey], fmt.Sprintf("日志读取失败: %v", err))
return
}
// 成功获取日志,保存完整信息
result[podKey] = append(result[podKey], fmt.Sprintf("Event Message: %s", message))
result[podKey] = append(result[podKey], fmt.Sprintf("Namespace: %s", namespace))
result[podKey] = append(result[podKey], fmt.Sprintf("Logs:\n%s", buf.String()))
}()
}
return result, nil
}
我们使用一个 map[string][]string
来保存 pod 的事件和日志信息,然后通过 client-go
获取 warning
级别的事件,最后过滤需要的 pod
事件以及 pod
相关信息,然后继续通过 client-go
获取对应 pod
的日志,然后把这些信息放到 map
中。
(4)设计一个 sendToChatGPT
的方法,接受 pod
的事件和日志信息,然后通过 AI
对其进行分析
// sendToChatGPT 函数接受 podInfo map,发送给 OpenAI 的 ChatGPT 获取诊断建议
func sendToChatGPT(podInfo map[string][]string) (string, error) {
// 检查输入数据
if len(podInfo) == 0 {
return "未发现任何 Pod Warning 事件", nil
}
// 创建 OpenAI 客户端
client, err := utils.NewOpenAIClient()
if err != nil {
return "", fmt.Errorf("创建 OpenAI 客户端失败: %v", err)
}
// 构建结构化的信息字符串
combinedInfo := buildPodInfoString(podInfo)
// 输出调试信息(可选,生产环境可移除)
fmt.Printf("正在分析 %d 个 Pod 的问题...\n", len(podInfo))
fmt.Println("详细信息:")
fmt.Println(combinedInfo)
fmt.Println("正在请求 AI 分析...")
// 构造优化的 ChatGPT 请求消息
messages := []openai.ChatCompletionMessage{
{
Role: openai.ChatMessageRoleSystem,
Content: `您是一位资深的 Kubernetes 专家和故障诊断专家。您的任务是:
1. 分析 Pod 的 Warning 事件和日志
2. 识别根本原因
3. 提供具体的、可操作的解决方案
4. 优先推荐命令行解决方案,必要时提供 YAML 配置
5. 按问题严重程度排序建议`,
},
{
Role: openai.ChatMessageRoleUser,
Content: fmt.Sprintf(`请分析以下 Kubernetes Pod 问题:
%s
请提供:
1. 问题根本原因分析
2. 具体解决步骤(优先使用 kubectl 命令)
3. 预防措施建议
4. 如果需要 YAML 配置,请提供完整示例
请确保建议具体可操作,避免泛泛而谈。`, combinedInfo),
},
}
// 请求 ChatGPT 获取建议
resp, err := client.Client.CreateChatCompletion(
context.TODO(),
openai.ChatCompletionRequest{
Model: openai.GPT4oMini,
Messages: messages,
MaxTokens: 2000, // 限制响应长度
Temperature: 0.1, // 降低随机性,提高一致性
},
)
if err != nil {
return "", fmt.Errorf("调用 OpenAI API 失败: %v", err)
}
// 验证响应
if len(resp.Choices) == 0 {
return "", fmt.Errorf("OpenAI 返回空响应")
}
responseText := resp.Choices[0].Message.Content
if responseText == "" {
return "AI 分析完成,但未返回具体建议", nil
}
return responseText, nil
}
// buildPodInfoString 构建格式化的 Pod 信息字符串
func buildPodInfoString(podInfo map[string][]string) string {
var builder strings.Builder
builder.WriteString("发现以下 Pod Warning 事件及其日志:\n\n")
podCount := 0
for podKey, info := range podInfo {
podCount++
builder.WriteString(fmt.Sprintf("=== Pod %d: %s ===\n", podCount, podKey))
// 分类显示信息
for _, line := range info {
if strings.HasPrefix(line, "Event Message:") {
builder.WriteString(fmt.Sprintf("🚨 %s\n", line))
} else if strings.HasPrefix(line, "Namespace:") {
builder.WriteString(fmt.Sprintf("📍 %s\n", line))
} else if strings.HasPrefix(line, "Logs:") {
builder.WriteString(fmt.Sprintf("📋 %s\n", line))
} else if strings.HasPrefix(line, "Additional Event:") {
builder.WriteString(fmt.Sprintf("🔄 %s\n", line))
} else {
builder.WriteString(fmt.Sprintf("%s\n", line))
}
}
builder.WriteString("\n")
}
return builder.String()
}
和 AI
的对话主要就是 prompt
的设计,然后把具体的参数传进去即可,没有特别的地方。
(5)使用 k8scopilot.exe analyze event
进行分析验证
分析结果如下:
正在请求 AI 分析...
根据您提供的 Kubernetes Pod 日志和事件信息,以下是对问题的根本原因分析、解决步骤、预防措施建议以及必要的 YAML 配置示例。
### 1. 问题根本原因分析
- **资源不足**:
- 多个 Pod 报告了 `ephemeral-storage` 不足的问题,导致 Pod 无法正常运行或重启。
- 许多 Pod 由于节点的 `DiskPressure` 状态而无法调度或运行。
- **网络连接问题**:
- 多个 Pod 的就绪探针和存活探针失败,显示连接超时或连接被拒绝,表明服务未能在指定端口上启动。
- **依赖注入失败**:
- 一些 Pod 报告了 Spring Boot 应用程序的依赖注入失败,特别是与 Dubbo 相关的服务未能正确配置。
- **镜像拉取失败**:
- 一些 Pod 报告了 `ImagePullBackOff`,这通常是由于镜像不存在或访问权限问题。
### 2. 具体解决步骤
#### 2.1 解决资源不足问题
- **清理节点上的不必要的资源**:
kubectl delete pod --all -n kube-system
kubectl delete pod --all -n log
- **增加节点的存储资源**:
如果可能,增加节点的存储容量,或者添加新的节点。
- **调整 Pod 的资源请求和限制**:
确保 Pod 的 `ephemeral-storage` 请求和限制设置合理。可以通过以下命令编辑 Pod 的 YAML 配置:
```bash
kubectl edit deployment <deployment-name> -n <namespace>
在容器部分添加:
resources:
requests:
ephemeral-storage: "100Mi"
limits:
ephemeral-storage: "200Mi"
#### 2.2 解决网络连接问题
- **检查服务是否正常运行**:
确保 Pod 中的服务在预期的端口上启动。可以通过以下命令查看 Pod 的状态:
```bash
kubectl get pods -n <namespace>
- **查看 Pod 日志**:
kubectl logs <pod-name> -n <namespace>
- **增加探针的超时时间**:
如果服务启动较慢,可以增加就绪探针和存活探针的超时时间:
readinessProbe:
httpGet:
path: /health
port: 8080
initialDelaySeconds: 30
periodSeconds: 10
timeoutSeconds: 5
#### 2.3 解决依赖注入失败
- **检查 Dubbo 配置**:
确保 Dubbo 的注册中心配置正确。可以在应用的配置文件中添加:
dubbo:
registry:
address: "zookeeper://localhost:2181"
- **重启相关 Pod**:
```bash
kubectl rollout restart deployment <deployment-name> -n <namespace>
#### 2.4 解决镜像拉取失败
- **检查镜像是否存在**:
确保指定的镜像在容器注册中心中存在,并且访问权限正确。
- **更新镜像标签**:
如果镜像标签不正确,可以更新为正确的标签:
kubectl set image deployment/<deployment-name> <container-name>=<new-image>:<tag> -n <namespace>
### 3. 预防措施建议
- **监控资源使用情况**:
使用 Kubernetes 的监控工具(如 Prometheus 和 Grafana)监控节点和 Pod 的资源使用情况,及时发现并解决资源不足的问题。
- **设置合理的资源请求和限制**:
在 Pod 的 YAML 配置中设置合理的资源请求和限制,以避免资源争用。
- **定期清理不必要的资源**:
定期检查和清理不再使用的 Pod 和镜像,以释放存储空间。
从上面的分析结果可以看到基本能够给出比较准确的建议。
当然,这里只是诊断问题,还可以对其功能进行扩展,比如:
故障自愈:
- 结合 Function Calling 实现自动修复简单问题
增强分析:
- 增加更多诊断数据源(metrics、节点状态等)
- 实现历史问题匹配和知识库
可视化:
- 生成 HTML 格式的诊断报告
- 支持问题严重程度分级展示
最后
本文在《开发 K8s Chat 命令行工具》的基础上,进一步实现了 Kubernetes 故障诊断功能,核心思路是通过工具获取集群中 Warning 级别的事件及对应 Pod 的日志,再借助大模型分析并输出解决方案,为运维工作提供了便捷的故障排查途径。
具体而言,开发过程通过 cobra-cli
新增 analyze
命令及子命令 event
,构建 getPodEventsAndLogs
方法获取相关事件与日志,设计 sendToChatGPT
方法将信息传入大模型进行分析。