开发一个 K8s Chat 命令行工具 原创
✍ 道路千万条,安全第一条。操作不规范,运维两行泪。
在前面我们介绍了[[03.大模型入门实战]]和 [[04.Agent入门实战]],了解了 AI 开发的基本流程,本章节我们将使用讨论如何将 Kubernetes 和 AI 结合起来。
本文主要从以下几个方面进行介绍:
- 怎么和 Kubernetes 进行交互
- Cobra 工具库介绍
- 开发 Chat K8s 命令行工具
Tips:文章只是起到抛砖引玉的作用,做大做强还需各位自己努力。
怎么和 Kubernetes 进行交互
Kubernetes 集群是云原生时代中最重要的基础设施,它里面运行着成千上万的 Pods,在日常的运维工作中,SRE 常用 kubectl
命令与它进行交互。
kubectl
是官方开发的客户端,已经将所有常用的操作集成到里面了。但是, 如果我们要通过接口与 Kubernetes 进行交互,就需要使用官方的一个核心库 Client-go
。
client-go
是 Kubernetes 官方提供的 Go 语言客户端库,用于与 Kubernetes API Server 进行交互。它让你可以通过编程的方式创建、更新、删除和查询 Kubernetes 资源(如 Pod、Deployment、Service 等),是开发 Kubernetes 控制器、Operator、自定义调度器或运维工具的核心依赖。
client-go
的核心组件主要有:
- Clientset:最常用的客户端集合,包含对 core、apps、networking 等 API 访问。
- Informer:监听资源变化,如 add、update、delete,避免频繁轮询。
- Lister:快速从本地缓存读取资源,主要是配合 Informer 使用。
- Workqueue:处理异步任务队列,常用于控制器中
- ...
下面我们举一个简单的例子获取所有 Pod 列表。
(1)初始化一个项目
mkdir k8s-pod-list && cd k8s-pod-list
go mod init k8s-pod-list
(2)安装 client-go
go get k8s.io/client-go/kubernetes
go get k8s.io/client-go/tools/clientcmd
go get k8s.io/client-go/rest
(3)编写代码
package main
import (
"context"
"fmt"
"log"
"path/filepath"
metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
"k8s.io/client-go/kubernetes"
"k8s.io/client-go/tools/clientcmd"
"k8s.io/client-go/util/homedir"
)
func main() {
// 1. 构建 kubeconfig 路径(默认 ~/.kube/config)
var kubeconfig string
if home := homedir.HomeDir(); home != "" {
kubeconfig = filepath.Join(home, ".kube", "config")
}
// 2. 加载 kubeconfig 文件,生成配置
config, err := clientcmd.BuildConfigFromFlags("", kubeconfig)
if err != nil {
log.Fatal(err)
}
// 3. 创建 Kubernetes 客户端
clientset, err := kubernetes.NewForConfig(config)
if err != nil {
log.Fatal(err)
}
// 4. 获取所有 Pod
pods, err := clientset.CoreV1().Pods("").List(context.TODO(), metav1.ListOptions{})
if err != nil {
log.Fatal(err)
}
// 5. 打印每个 Pod 的命名空间和名称
fmt.Printf("Found %d pods:\n", len(pods.Items))
for _, pod := range pods.Items {
fmt.Printf("Namespace: %s, Pod: %s\n", pod.Namespace, pod.Name)
}
}
(4)运行 go run main.go
,输出如下:
> go run main.go
Found 713 pods:
Namespace: apo, Pod: apo-alertmanager-554fd4bbfc-mjp6q
Namespace: apo, Pod: apo-altinity-clickhouse-operator-cc87bb785-j9t9q
Namespace: apo, Pod: apo-backend-594c764b94-nd4zw
Namespace: apo, Pod: apo-collector-76979647fd-cmgg2
Namespace: apo, Pod: apo-front-5fbf5f4566-4rdh9
Namespace: apo, Pod: apo-grafana-57b98b9d59-f9k4h
Namespace: apo, Pod: apo-jaeger-collector-6d7bbc945d-6p95x
Namespace: apo, Pod: apo-odigos-instrumentor-8689cd45c6-xgvns
client-go
是与 Kubernetes 交互的 Go 语言标准工具,它是构建云原生工具的基础,适合用于开发自动化运维系统、Kubernetes 插件、监控工具等。
本文我们希望的是开发一个 Kubernetes 工具,单纯使用 client-go
略显麻烦,我们还需要使用另外一个 CLI 工具库——Cobra,它可以为我们开发提升很大的效率。
Cobra 工具库介绍
Cobra
是 Go 语言中最流行的命令行工具库,用于构建功能强大、结构清晰的现代 CLI(Command-Line Interface)应用程序。它被广泛用于许多知名开源项目中,例如:
- Kubernetes (
kubectl
) - Helm
- Docker (
docker CLI
)
它的核心特性有:
- Command:支持命令及子命令,如
app server
,app config set
。 - Flags:支持全局和局部 flag,如
--config
,-v
。 - Args:命令的参数,一般用来表示操作的对象,比如
kubectl get pod
,其中pod
就是操作的对象。 - 可以自动生成 help,输入
app --help
自动输出帮助信息。 - 可以 shell 环境下的自动补全。
对于 Command
,每个命令都是一个 &cobra.Command{}
,比如:
&cobra.Command{
Use: "hello", // 命令用法
Short: "简短描述",
Long: "长描述",
Run: func(cmd *cobra.Command, args []string) {
// 执行逻辑
},
}
而 Flags
分为 全局 Flag
和 局部 Flag
,取决你的 Flag 是假如的 rootCmd
还是 其他命令。
// 局部 flag
helloCmd.Flags().StringVarP(&name, "name", "n", "default", "姓名")
// 全局 flag(加到 rootCmd)
rootCmd.PersistentFlags().Bool("verbose", false, "启用详细日志")
另外,Flags
支持 String
、Bool
、Int
、Float
类型,如下:
- String 类型
- StringVar(&variable, "flag", "default", "description"),不带缩写
- StringVarP(&variable, "flag", ”shorthand“, "default", "description"),带缩写
- StringSliceVar(&variable, "flag", []string{}, "description"),可以接手数组,cli可以传入多个flag
- StringVarP(&variable, "flag", ”shorthand“, []string{}, "description")
- Bool 类型
- BoolVar(&variable, "flag", "default", "description"),不带缩写
- BoolVarP(&variable, "flag", ”shorthand“, "default", "description"),带缩写
- Int 类型
- IntVar(&variable, "flag", "default", "description"),不带缩写
- IntVarP(&variable, "flag", ”shorthand“, "default", "description"),带缩写
- Float 类型
- FloatVar(&variable, "flag", "default", "description"),不带缩写
- FloatVarP(&variable, "flag", ”shorthand“, "default", "description"),带缩写
下面我们开发一个最简单的 hello
命令。
(1)安装 Cobra
go get -u github.com/spf13/cobra/cobra
(2)初始化项目
mkdir hello && cd hello
go mod init hello
cobra-cli init
这会自动生成:
cmd/root.go
—— 主命令main.go
—— 入口- 自动支持
myapp --help
(3)添加子命令
cobra-cli add hello
(4)编辑 cmd/hello.go
,添加逻辑
package cmd
import (
"fmt"
"github.com/spf13/cobra"
)
var name string
var helloCmd = &cobra.Command{
Use: "hello",
Short: "打印问候语",
Long: `一个简单的命令,用于向某人问好`,
Run: func(cmd *cobra.Command, args []string) {
if name == "" {
name = "World"
}
fmt.Printf("Hello, %s!\n", name)
},
}
func init() {
rootCmd.AddCommand(helloCmd)
helloCmd.Flags().StringVarP(&name, "name", "n", "", "输入你的名字")
}
(5)运行程序,查看输出。
go run main.go hello
# 输出: Hello, World!
go run main.go hello --name=Jokerbai
# 输出: Hello, Jokerbai!
go run main.go hello -n Bob
# 输出: Hello, Bob!
go run main.go hello --help
# 一个简单的命令,用于向某人问好
#
# Usage:
# cobra-hello hello [flags]
#
# Flags:
# -h, --help help for hello
# -n, --name string 输入你的名字
通过上面简单的示例,我们不难发现通过 Cobra
,我们可以快速搭建起命令工具的架子,后续只需要结合业务对这个架子进行装修,省去不少麻烦。
开发 Chat K8s 命令行工具
上面我们对本文需要的基础工具进行了简单介绍,接下来我们将进入第一个主题:开发 Chat K8s 命令行工具。
我们这个工具需要实现的效果是:
- Command:k8scopilot ask chatgpt 进入交互模式
- 可以进行资源的部署,比如:
- 帮我部署一个 Deploy,镜像是 nginx
- 部署一个 Pod,镜像是 nginx
- 可以查询资源信息,比如:
- 查一下 default ns 的 Pod、SVC、Deploy 资源
- 可以删除资源,比如:
- 删除 default ns 下的 nginx deploy
下面我们进入正式的开发。
1.创建一个 k8scopilot
目录,使用 cobra-cli
初始化项目
mkdir k8scopilot && cd k8scopilot
go mod init
cobra-cli initr
2.添加第一个 ask
命令
cobra-cli add ask
3.接着,添加一个 chatgpt
命令,它是 ask
的子命令
cobra-cli add chatgpt -p "askCmd"
4.在 chatgpt
命令中实现一个简单的功能:
- 当用户执行
k8scopilot ask chatgpt
进入控制台输入 - 然后用户可以在控制台中进行对话
- 当用户输入
exit
退出
(1)增加一个 startChat
方法,用于处理生成对话回复
func startChat() {
scanner := bufio.NewScanner(os.Stdin)
fmt.Println("我是 K8S Copilot,你可以向我咨询 K8S 相关问题")
for {
fmt.Print("> ")
if scanner.Scan() {
input := scanner.Text()
if input == "exit" {
fmt.Println("退出对话")
break
}
if input == "" {
continue
}
fmt.Println("你输入的是:", input)
}
}
}
效果如下:
> .\k8scopilot.exe ask chatgpt
我是 K8S Copilot,你可以向我咨询 K8S 相关问题
> 部署资源
你输入的是: 部署资源
> exit
退出对话
(2)封装 OpenAI
客户端,方便后续调用大模型。创建一个 utils
目录,然后创建 openai.go
完成客户端封装。
package utils
import (
"context"
"errors"
"net/http"
"os"
"time"
"github.com/sashabaranov/go-openai"
)
const defaultBaseURL = "https://vip.apiyi.com/v1"
const defaultApiKey = "sk-xxxx"
type OpenAI struct {
Client *openai.Client
ctx context.Context
}
func NewOpenAI() *OpenAI {
apiKey := os.Getenv("OPENAI_API_KEY")
if apiKey == "" {
apiKey = defaultApiKey
}
apiBase := os.Getenv("OPENAI_API_BASE")
if apiBase == "" {
apiBase = defaultBaseURL
}
config := openai.DefaultConfig(apiKey)
config.BaseURL = apiBase
config.HTTPClient = &http.Client{
Timeout: 30 * time.Second,
}
client := openai.NewClientWithConfig(config)
return &OpenAI{
Client: client,
ctx: context.Background(),
}
}
func (op *OpenAI) SendMessage(prompt string, content string) (string, error) {
req := openai.ChatCompletionRequest{
Model: openai.GPT4oMini,
Messages: []openai.ChatCompletionMessage{
{
Role: "system",
Content: prompt,
},
{
Role: "user",
Content: content,
},
},
}
resp, err := op.Client.CreateChatCompletion(op.ctx, req)
if err != nil {
return "", err
}
if len(resp.Choices) == 0 {
return "", errors.New("no response")
}
return resp.Choices[0].Message.Content, nil
}
我们定义了 NewOpenAI
方法用来初始化 openai
,然后再定义了一个 SendMessage
方法用来和 GPT
进行交互。
(3)实现 FunctionCall
,在需要根据用户输入执行不同的操作的时候,我们在《大模型入门实战》章节有讲过 FunctionCall
,该项目我们将使用其实现。
这里只实现三个功能:
- 生成 YAML 清单并部署
- 查询 K8s 资源
- 删除 K8s 资源
在 chatgpt.go
中实现。
首先,定义一个工具函数 createTools
,用于定义 openai
的工具集。
func createTools() []openai.Tool {
// 用来生成 K8s YAML 并部署资源
f1 := openai.FunctionDefinition{
Name: "generateAndDeployResource",
Description: "生成 K8s YAML 并部署资源",
Parameters: jsonschema.Definition{
Type: jsonschema.Object,
Properties: map[string]jsonschema.Definition{
"user_input": {
Type: jsonschema.String,
Description: "用户输出的文本内容,要求包含资源类型和镜像",
},
},
Required: []string{"user_input"}, // 修复:应该是 user_input 而不是 location
},
}
// 用来查询 K8s 资源
f2 := openai.FunctionDefinition{
Name: "queryResource",
Description: "查询 K8s 资源",
Parameters: jsonschema.Definition{
Type: jsonschema.Object,
Properties: map[string]jsonschema.Definition{
"namespace": {
Type: jsonschema.String,
Description: "资源所在的命名空间",
},
"resource_type": {
Type: jsonschema.String,
Description: "K8s 资源标准类型,例如 Pod、Deployment、Service 等",
},
},
Required: []string{"namespace", "resource_type"}, // 添加必需参数
},
}
// 用来删除 K8s 资源
f3 := openai.FunctionDefinition{
Name: "deleteResource",
Description: "删除 K8s 资源",
Parameters: jsonschema.Definition{
Type: jsonschema.Object,
Properties: map[string]jsonschema.Definition{
"namespace": {
Type: jsonschema.String,
Description: "资源所在的命名空间",
},
"resource_type": {
Type: jsonschema.String,
Description: "K8s 资源标准类型,例如 Pod、Deployment、Service 等",
},
"resource_name": {
Type: jsonschema.String,
Description: "资源名称",
},
},
Required: []string{"namespace", "resource_type", "resource_name"}, // 添加必需参数
},
}
return []openai.Tool{
{Type: openai.ToolTypeFunction, Function: &f1},
{Type: openai.ToolTypeFunction, Function: &f2},
{Type: openai.ToolTypeFunction, Function: &f3},
}
}
接着,我们定义 functionCalling
方法,用于将工具注册到大模型以及执行工具调用。
func functionCalling(input string, client *utils.OpenAI) string {
tools := createTools()
dialogue := []openai.ChatCompletionMessage{
{Role: openai.ChatMessageRoleUser, Content: input},
}
// 调用 OpenAI API
resp, err := client.Client.CreateChatCompletion(context.TODO(),
openai.ChatCompletionRequest{
Model: openai.GPT4TurboPreview,
Messages: dialogue,
Tools: tools,
},
)
if err != nil {
return fmt.Sprintf("OpenAI API 调用失败: %v", err)
}
// 验证响应
if len(resp.Choices) == 0 {
return "OpenAI 返回了空的响应"
}
msg := resp.Choices[0].Message
// 如果没有工具调用,直接返回消息内容
if len(msg.ToolCalls) == 0 {
if msg.Content != "" {
return msg.Content
}
return "抱歉,我无法理解您的请求,请提供更具体的信息。"
}
// 处理多个工具调用(虽然当前逻辑假设只有一个)
if len(msg.ToolCalls) > 1 {
return "抱歉,当前不支持同时执行多个操作,请一次只执行一个操作。"
}
// 执行工具调用
toolCall := msg.ToolCalls[0]
result, err := callFunction(client, toolCall.Function.Name, toolCall.Function.Arguments)
if err != nil {
return fmt.Sprintf("执行操作失败: %v", err)
}
return result
}
在 functionCalling
中,我们调用了 callFunction
方法执行工具调用,下面实现 callFuntion
,它将根据 OpenAI
返回的消息,调用对应的函数。
// 根据 OpenAI 返回的消息,调用对应的函数
func callFunction(client *utils.OpenAI, name, arguments string) (string, error) {
switch name {
case "generateAndDeployResource":
params := struct {
UserInput string `json:"user_input"`
}{}
if err := json.Unmarshal([]byte(arguments), ¶ms); err != nil {
return "", fmt.Errorf("解析生成部署资源参数失败: %v", err)
}
return generateAndDeployResource(client, params.UserInput)
case "queryResource":
params := struct {
Namespace string `json:"namespace"`
ResourceType string `json:"resource_type"`
}{}
if err := json.Unmarshal([]byte(arguments), ¶ms); err != nil {
return "", fmt.Errorf("解析查询资源参数失败: %v", err)
}
return queryResource(params.Namespace, params.ResourceType)
case "deleteResource":
params := struct {
Namespace string `json:"namespace"`
ResourceType string `json:"resource_type"`
ResourceName string `json:"resource_name"`
}{}
if err := json.Unmarshal([]byte(arguments), ¶ms); err != nil {
return "", fmt.Errorf("解析删除资源参数失败: %v", err)
}
return deleteResource(params.Namespace, params.ResourceType, params.ResourceName)
default:
return "", fmt.Errorf("未知的函数: %s", name)
}
}
接下来,我们需要实现具体的操作方法,它们分别是:
- generateAndDeployResource
- queryResource
- deleteResource
在这之前,我们需要在 utils
中封装 client-go
,方便后续调用。
// utils/client_go.go
package utils
import (
"fmt"
"path/filepath"
"strings"
"k8s.io/client-go/discovery"
"k8s.io/client-go/dynamic"
"k8s.io/client-go/kubernetes"
"k8s.io/client-go/tools/clientcmd"
"k8s.io/client-go/util/homedir"
)
// ClientGo encapsulates both clientset and dynamicClient for Kubernetes operations
type ClientGo struct {
Clientset *kubernetes.Clientset
DynamicClient dynamic.Interface
DiscoveryClient discovery.DiscoveryInterface
}
// NewClientGo initializes a new ClientGo instance with the provided kubeconfig path
func NewClientGo(kubeconfig string) (*ClientGo, error) {
// Handle ~ in the kubeconfig path
if strings.HasPrefix(kubeconfig, "~") {
homeDir := homedir.HomeDir()
kubeconfig = filepath.Join(homeDir, kubeconfig[1:])
}
// Build the configuration from the kubeconfig file
config, err := clientcmd.BuildConfigFromFlags("", kubeconfig)
if err != nil {
return nil, fmt.Errorf("failed to build kubeconfig: %w", err)
}
// Create the clientset
clientset, err := kubernetes.NewForConfig(config)
if err != nil {
return nil, fmt.Errorf("failed to create clientset: %w", err)
}
// Create the dynamic client
dynamicClient, err := dynamic.NewForConfig(config)
if err != nil {
return nil, fmt.Errorf("failed to create dynamic client: %w", err)
}
// Create DiscoveryClient
discoveryClient, err := discovery.NewDiscoveryClientForConfig(config)
if err != nil {
return nil, fmt.Errorf("failed to create discovery client: %w", err)
}
return &ClientGo{
Clientset: clientset,
DynamicClient: dynamicClient,
DiscoveryClient: discoveryClient,
}, nil
}
然后,我们再来实现具体的方法。
其中,generateAndDeployResource
是用来生成 YAML 清单以及部署资源的,其逻辑就是调用大模型生成纯净的 K8s YAML 资源,然后使用 Client-go
完成资源部署。
// 生成 K8s YAML 并部署资源
func generateAndDeployResource(client *utils.OpenAI, userInput string) (string, error) {
// 生成 YAML 内容
yamlContent, err := client.SendMessage("你现在是一个 K8s 资源生成器,根据用户输入生成 K8s YAML,注意除了 YAML 内容以外不要输出任何内容,此外不要把 YAML 放在 ``` 代码快里", userInput)
if err != nil {
return "", fmt.Errorf("生成 YAML 失败: %v", err)
}
// 创建 Kubernetes 客户端
clientGo, err := utils.NewClientGo(kubeconfig) // kubeconfig 是一个全局 Flag
if err != nil {
return "", fmt.Errorf("创建 Kubernetes 客户端失败: %v", err)
}
// 获取 API 资源
resources, err := restmapper.GetAPIGroupResources(clientGo.DiscoveryClient)
if err != nil {
return "", fmt.Errorf("获取 API 资源失败: %v", err)
}
// 将 YAML 转成 unstructured 对象
unstructuredObj := &unstructured.Unstructured{}
_, _, err = scheme.Codecs.UniversalDeserializer().Decode([]byte(yamlContent), nil, unstructuredObj)
if err != nil {
return "", fmt.Errorf("解析 YAML 失败: %v", err)
}
// 创建 REST mapper
mapper := restmapper.NewDiscoveryRESTMapper(resources)
// 从 unstructuredObj 中提取 GVK
gvk := unstructuredObj.GroupVersionKind()
// 用 GVK 转 GVR
mapping, err := mapper.RESTMapping(gvk.GroupKind(), gvk.Version)
if err != nil {
return "", fmt.Errorf("映射资源类型失败: %v", err)
}
// 设置默认命名空间
namespace := unstructuredObj.GetNamespace()
if namespace == "" {
namespace = "default"
}
// 部署资源
_, err = clientGo.DynamicClient.Resource(mapping.Resource).Namespace(namespace).Create(context.TODO(), unstructuredObj, metav1.CreateOptions{})
if err != nil {
return "", fmt.Errorf("部署资源失败: %v", err)
}
resourceName := unstructuredObj.GetName()
resourceKind := unstructuredObj.GetKind()
return fmt.Sprintf("✅ 成功部署 %s/%s 到命名空间 %s\n\n生成的 YAML:\n%s", resourceKind, resourceName, namespace, yamlContent), nil
}
对于 queryResource
是用来查询资源信息的,不同的资源信息有不同的 GVR
,如下:
// 查询 K8s 资源
func queryResource(namespace, resourceType string) (string, error) {
// 创建 Kubernetes 客户端
clientGo, err := utils.NewClientGo(kubeconfig)
if err != nil {
return "", fmt.Errorf("创建 Kubernetes 客户端失败: %v", err)
}
// 标准化资源类型
resourceType = strings.ToLower(resourceType)
var gvr schema.GroupVersionResource
var displayName string
switch resourceType {
case "deployment", "deployments":
gvr = schema.GroupVersionResource{Group: "apps", Version: "v1", Resource: "deployments"}
displayName = "Deployment"
case "service", "services", "svc":
gvr = schema.GroupVersionResource{Group: "", Version: "v1", Resource: "services"}
displayName = "Service"
case "pod", "pods":
gvr = schema.GroupVersionResource{Group: "", Version: "v1", Resource: "pods"}
displayName = "Pod"
case "configmap", "configmaps", "cm":
gvr = schema.GroupVersionResource{Group: "", Version: "v1", Resource: "configmaps"}
displayName = "ConfigMap"
case "secret", "secrets":
gvr = schema.GroupVersionResource{Group: "", Version: "v1", Resource: "secrets"}
displayName = "Secret"
default:
return "", fmt.Errorf("不支持的资源类型: %s。支持的类型: deployment, service, pod, configmap, secret", resourceType)
}
// 查询资源
resourceList, err := clientGo.DynamicClient.Resource(gvr).Namespace(namespace).List(context.TODO(), metav1.ListOptions{})
if err != nil {
return "", fmt.Errorf("查询资源失败: %v", err)
}
// 格式化输出结果
if len(resourceList.Items) == 0 {
return fmt.Sprintf("在命名空间 '%s' 中未找到任何 %s 资源", namespace, displayName), nil
}
result := fmt.Sprintf("在命名空间 '%s' 中找到 %d 个 %s 资源:\n\n", namespace, len(resourceList.Items), displayName)
for i, item := range resourceList.Items {
result += fmt.Sprintf("%d. %s\n", i+1, item.GetName())
// 添加一些额外信息
if creationTime := item.GetCreationTimestamp(); !creationTime.IsZero() {
result += fmt.Sprintf(" 创建时间: %s\n", creationTime.Format("2006-01-02 15:04:05"))
}
result += "\n"
}
return result, nil
}
最后,deleteResource
是用来删除资源信息的,和 queryResource
逻辑差不多,如下:
// 删除 K8s 资源
func deleteResource(namespace, resourceType, resourceName string) (string, error) {
// 创建 Kubernetes 客户端
clientGo, err := utils.NewClientGo(kubeconfig)
if err != nil {
return "", fmt.Errorf("创建 Kubernetes 客户端失败: %v", err)
}
// 标准化资源类型
resourceType = strings.ToLower(resourceType)
var gvr schema.GroupVersionResource
var displayName string
switch resourceType {
case "deployment", "deployments":
gvr = schema.GroupVersionResource{Group: "apps", Version: "v1", Resource: "deployments"}
displayName = "Deployment"
case "service", "services", "svc":
gvr = schema.GroupVersionResource{Group: "", Version: "v1", Resource: "services"}
displayName = "Service"
case "pod", "pods":
gvr = schema.GroupVersionResource{Group: "", Version: "v1", Resource: "pods"}
displayName = "Pod"
case "configmap", "configmaps", "cm":
gvr = schema.GroupVersionResource{Group: "", Version: "v1", Resource: "configmaps"}
displayName = "ConfigMap"
case "secret", "secrets":
gvr = schema.GroupVersionResource{Group: "", Version: "v1", Resource: "secrets"}
displayName = "Secret"
default:
return "", fmt.Errorf("不支持的资源类型: %s。支持的类型: deployment, service, pod, configmap, secret", resourceType)
}
// 检查资源是否存在
_, err = clientGo.DynamicClient.Resource(gvr).Namespace(namespace).Get(context.TODO(), resourceName, metav1.GetOptions{})
if err != nil {
return "", fmt.Errorf("资源 %s/%s 在命名空间 '%s' 中不存在: %v", displayName, resourceName, namespace, err)
}
// 删除资源
err = clientGo.DynamicClient.Resource(gvr).Namespace(namespace).Delete(context.TODO(), resourceName, metav1.DeleteOptions{})
if err != nil {
return "", fmt.Errorf("删除资源失败: %v", err)
}
return fmt.Sprintf("成功删除 %s/%s 从命名空间 '%s'", displayName, resourceName, namespace), nil
}
(4)我们将 functionCall
和 startChat
结合起来。
首先,增加一个 processInput
方法,主要用来初始化 openai
,并将用户输入传递给大模型。
func processInput(input string) string {
// 1. 初始化 openai
client, err := utils.NewOpenAIClient()
if err != nil {
return err.Error()
}
// 2. 封装 utils/openai.go,调用 OpenAI API 得到回复
// response, err := client.SendMessage("你好", input)
// 3. 调用 OpenAI Function calling
response := functionCalling(input, client)
return response
}
最后,在 startChat
中调用 processInput
即可。
func startChat() {
scanner := bufio.NewScanner(os.Stdin)
fmt.Println("我是 K8S Copilot,你可以向我咨询 K8S 相关问题")
for {
fmt.Print("> ")
if scanner.Scan() {
input := scanner.Text()
if input == "exit" {
fmt.Println("退出对话")
break
}
if input == "" {
continue
}
response := processInput(input)
fmt.Println(response)
}
}
}
(5)执行效果如下
总结
本文探讨了如何将人工智能技术与 Kubernetes 运维实践相结合,成功构建了一个名为 k8scopilot
的智能命令行助手。通过整合 client-go
、Cobra
和 OpenAI
的 Function Calling
等关键技术,我们实现了一个能够理解自然语言指令并执行相应 Kubernetes 操作的 AI Agent。
我们首先介绍了与 Kubernetes 集群交互的核心工具 client-go
,它是所有自动化操作的基石。接着,我们利用 Cobra
库高效地构建了结构清晰、易于扩展的命令行界面。最后,通过 OpenAI 的大语言模型,特别是其 Function Calling 能力,我们将模糊的自然语言转换为精确的、可编程的 API 调用,实现了从“人理解机器命令”到“机器理解人”的范式转变。