Skip to content

开发一个 K8s Chat 命令行工具 原创

✍ 道路千万条,安全第一条。操作不规范,运维两行泪。

在前面我们介绍了[[03.大模型入门实战]]和 [[04.Agent入门实战]],了解了 AI 开发的基本流程,本章节我们将使用讨论如何将 Kubernetes 和 AI 结合起来。

本文主要从以下几个方面进行介绍:

  • 怎么和 Kubernetes 进行交互
  • Cobra 工具库介绍
  • 开发 Chat K8s 命令行工具

Tips:文章只是起到抛砖引玉的作用,做大做强还需各位自己努力。

怎么和 Kubernetes 进行交互

Kubernetes 集群是云原生时代中最重要的基础设施,它里面运行着成千上万的 Pods,在日常的运维工作中,SRE 常用 kubectl 命令与它进行交互。

kubectl 是官方开发的客户端,已经将所有常用的操作集成到里面了。但是, 如果我们要通过接口与 Kubernetes 进行交互,就需要使用官方的一个核心库 Client-go

client-go 是 Kubernetes 官方提供的 Go 语言客户端库,用于与 Kubernetes API Server 进行交互。它让你可以通过编程的方式创建、更新、删除和查询 Kubernetes 资源(如 Pod、Deployment、Service 等),是开发 Kubernetes 控制器、Operator、自定义调度器或运维工具的核心依赖。

client-go 的核心组件主要有:

  • Clientset:最常用的客户端集合,包含对 core、apps、networking 等 API 访问。
  • Informer:监听资源变化,如 add、update、delete,避免频繁轮询。
  • Lister:快速从本地缓存读取资源,主要是配合 Informer 使用。
  • Workqueue:处理异步任务队列,常用于控制器中
  • ...

下面我们举一个简单的例子获取所有 Pod 列表。

(1)初始化一个项目

bash
mkdir k8s-pod-list && cd k8s-pod-list
go mod init k8s-pod-list

(2)安装 client-go

bash
go get k8s.io/client-go/kubernetes
go get k8s.io/client-go/tools/clientcmd
go get k8s.io/client-go/rest

(3)编写代码

go
package main

import (
    "context"
    "fmt"
    "log"
    "path/filepath"

    metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
    "k8s.io/client-go/kubernetes"
    "k8s.io/client-go/tools/clientcmd"
    "k8s.io/client-go/util/homedir"
)

func main() {
    // 1. 构建 kubeconfig 路径(默认 ~/.kube/config)
    var kubeconfig string
    if home := homedir.HomeDir(); home != "" {
        kubeconfig = filepath.Join(home, ".kube", "config")
    }

    // 2. 加载 kubeconfig 文件,生成配置
    config, err := clientcmd.BuildConfigFromFlags("", kubeconfig)
    if err != nil {
        log.Fatal(err)
    }

    // 3. 创建 Kubernetes 客户端
    clientset, err := kubernetes.NewForConfig(config)
    if err != nil {
        log.Fatal(err)
    }

    // 4. 获取所有 Pod
    pods, err := clientset.CoreV1().Pods("").List(context.TODO(), metav1.ListOptions{})
    if err != nil {
        log.Fatal(err)
    }

    // 5. 打印每个 Pod 的命名空间和名称
    fmt.Printf("Found %d pods:\n", len(pods.Items))
    for _, pod := range pods.Items {
        fmt.Printf("Namespace: %s, Pod: %s\n", pod.Namespace, pod.Name)
    }
}

(4)运行 go run main.go,输出如下:

bash
> go run main.go
Found 713 pods:
Namespace: apo, Pod: apo-alertmanager-554fd4bbfc-mjp6q
Namespace: apo, Pod: apo-altinity-clickhouse-operator-cc87bb785-j9t9q
Namespace: apo, Pod: apo-backend-594c764b94-nd4zw
Namespace: apo, Pod: apo-collector-76979647fd-cmgg2
Namespace: apo, Pod: apo-front-5fbf5f4566-4rdh9
Namespace: apo, Pod: apo-grafana-57b98b9d59-f9k4h
Namespace: apo, Pod: apo-jaeger-collector-6d7bbc945d-6p95x
Namespace: apo, Pod: apo-odigos-instrumentor-8689cd45c6-xgvns

client-go 是与 Kubernetes 交互的 Go 语言标准工具,它是构建云原生工具的基础,适合用于开发自动化运维系统、Kubernetes 插件、监控工具等。

本文我们希望的是开发一个 Kubernetes 工具,单纯使用 client-go 略显麻烦,我们还需要使用另外一个 CLI 工具库——Cobra,它可以为我们开发提升很大的效率。

Cobra 工具库介绍

Cobra 是 Go 语言中最流行的命令行工具库,用于构建功能强大、结构清晰的现代 CLI(Command-Line Interface)应用程序。它被广泛用于许多知名开源项目中,例如:

  • Kubernetes (kubectl)
  • Helm
  • Docker (docker CLI)

它的核心特性有:

  • Command:支持命令及子命令,如 app serverapp config set
  • Flags:支持全局和局部 flag,如 --config-v
  • Args:命令的参数,一般用来表示操作的对象,比如 kubectl get pod,其中 pod 就是操作的对象。
  • 可以自动生成 help,输入 app --help 自动输出帮助信息。
  • 可以 shell 环境下的自动补全。

对于 Command,每个命令都是一个 &cobra.Command{},比如:

go
&cobra.Command{
    Use:   "hello",              // 命令用法
    Short: "简短描述",
    Long:  "长描述",
    Run: func(cmd *cobra.Command, args []string) {
        // 执行逻辑
    },
}

Flags 分为 全局 Flag局部 Flag,取决你的 Flag 是假如的 rootCmd 还是 其他命令。

go
// 局部 flag
helloCmd.Flags().StringVarP(&name, "name", "n", "default", "姓名")

// 全局 flag(加到 rootCmd)
rootCmd.PersistentFlags().Bool("verbose", false, "启用详细日志")

另外,Flags 支持 StringBoolIntFloat 类型,如下:

  • String 类型
    • StringVar(&variable, "flag", "default", "description"),不带缩写
    • StringVarP(&variable, "flag", ”shorthand“, "default", "description"),带缩写
    • StringSliceVar(&variable, "flag", []string{}, "description"),可以接手数组,cli可以传入多个flag
    • StringVarP(&variable, "flag", ”shorthand“, []string{}, "description")
  • Bool 类型
    • BoolVar(&variable, "flag", "default", "description"),不带缩写
    • BoolVarP(&variable, "flag", ”shorthand“, "default", "description"),带缩写
  • Int 类型
    • IntVar(&variable, "flag", "default", "description"),不带缩写
    • IntVarP(&variable, "flag", ”shorthand“, "default", "description"),带缩写
  • Float 类型
    • FloatVar(&variable, "flag", "default", "description"),不带缩写
    • FloatVarP(&variable, "flag", ”shorthand“, "default", "description"),带缩写

下面我们开发一个最简单的 hello命令。

(1)安装 Cobra

bash
go get -u github.com/spf13/cobra/cobra

(2)初始化项目

mkdir hello && cd hello
go mod init hello
cobra-cli init

这会自动生成:

  • cmd/root.go —— 主命令
  • main.go —— 入口
  • 自动支持 myapp --help

(3)添加子命令

bash
cobra-cli add hello

(4)编辑 cmd/hello.go,添加逻辑

go
package cmd

import (
    "fmt"
    "github.com/spf13/cobra"
)

var name string

var helloCmd = &cobra.Command{
    Use:   "hello",
    Short: "打印问候语",
    Long:  `一个简单的命令,用于向某人问好`,
    Run: func(cmd *cobra.Command, args []string) {
        if name == "" {
            name = "World"
        }
        fmt.Printf("Hello, %s!\n", name)
    },
}

func init() {
    rootCmd.AddCommand(helloCmd)
    helloCmd.Flags().StringVarP(&name, "name", "n", "", "输入你的名字")
}

(5)运行程序,查看输出。

bash
go run main.go hello
# 输出: Hello, World!

go run main.go hello --name=Jokerbai
# 输出: Hello, Jokerbai!

go run main.go hello -n Bob
# 输出: Hello, Bob!

go run main.go hello --help
# 一个简单的命令,用于向某人问好
#
# Usage:
#  cobra-hello hello [flags]
#
# Flags:
#   -h, --help          help for hello
#   -n, --name string   输入你的名字

通过上面简单的示例,我们不难发现通过 Cobra,我们可以快速搭建起命令工具的架子,后续只需要结合业务对这个架子进行装修,省去不少麻烦。

开发 Chat K8s 命令行工具

上面我们对本文需要的基础工具进行了简单介绍,接下来我们将进入第一个主题:开发 Chat K8s 命令行工具。

我们这个工具需要实现的效果是:

  • Command:k8scopilot ask chatgpt 进入交互模式
  • 可以进行资源的部署,比如:
    • 帮我部署一个 Deploy,镜像是 nginx
    • 部署一个 Pod,镜像是 nginx
  • 可以查询资源信息,比如:
    • 查一下 default ns 的 Pod、SVC、Deploy 资源
  • 可以删除资源,比如:
    • 删除 default ns 下的 nginx deploy

下面我们进入正式的开发。

1.创建一个 k8scopilot目录,使用 cobra-cli 初始化项目

bash
mkdir k8scopilot && cd k8scopilot
go mod init
cobra-cli initr

2.添加第一个 ask 命令

bash
cobra-cli add ask

3.接着,添加一个 chatgpt 命令,它是 ask 的子命令

bash
cobra-cli add chatgpt -p "askCmd"

4.在 chatgpt 命令中实现一个简单的功能:

  • 当用户执行 k8scopilot ask chatgpt 进入控制台输入
  • 然后用户可以在控制台中进行对话
  • 当用户输入 exit 退出

(1)增加一个 startChat 方法,用于处理生成对话回复

go
func startChat() {
	scanner := bufio.NewScanner(os.Stdin)
	fmt.Println("我是 K8S Copilot,你可以向我咨询 K8S 相关问题")

	for {
		fmt.Print("> ")
		if scanner.Scan() {
			input := scanner.Text()
			if input == "exit" {
				fmt.Println("退出对话")
				break
			}
			if input == "" {
				continue
			}
			fmt.Println("你输入的是:", input)
		}
	}
}

效果如下:

bash
> .\k8scopilot.exe ask chatgpt
我是 K8S Copilot,你可以向我咨询 K8S 相关问题
> 部署资源
你输入的是: 部署资源
> exit
退出对话

(2)封装 OpenAI 客户端,方便后续调用大模型。创建一个 utils 目录,然后创建 openai.go 完成客户端封装。

go
package utils

import (
	"context"
	"errors"
	"net/http"
	"os"
	"time"

	"github.com/sashabaranov/go-openai"
)

const defaultBaseURL = "https://vip.apiyi.com/v1"
const defaultApiKey = "sk-xxxx"

type OpenAI struct {
	Client *openai.Client
	ctx    context.Context
}

func NewOpenAI() *OpenAI {
	apiKey := os.Getenv("OPENAI_API_KEY")
	if apiKey == "" {
		apiKey = defaultApiKey
	}
	apiBase := os.Getenv("OPENAI_API_BASE")
	if apiBase == "" {
		apiBase = defaultBaseURL
	}
	config := openai.DefaultConfig(apiKey)
	config.BaseURL = apiBase
	config.HTTPClient = &http.Client{
		Timeout: 30 * time.Second,
	}
	client := openai.NewClientWithConfig(config)

	return &OpenAI{
		Client: client,
		ctx:    context.Background(),
	}
}

func (op *OpenAI) SendMessage(prompt string, content string) (string, error) {
	req := openai.ChatCompletionRequest{
		Model: openai.GPT4oMini,
		Messages: []openai.ChatCompletionMessage{
			{
				Role:    "system",
				Content: prompt,
			},
			{
				Role:    "user",
				Content: content,
			},
		},
	}
	resp, err := op.Client.CreateChatCompletion(op.ctx, req)
	if err != nil {
		return "", err
	}
	if len(resp.Choices) == 0 {
		return "", errors.New("no response")
	}
	return resp.Choices[0].Message.Content, nil
}

我们定义了 NewOpenAI 方法用来初始化 openai,然后再定义了一个 SendMessage 方法用来和 GPT 进行交互。

(3)实现 FunctionCall,在需要根据用户输入执行不同的操作的时候,我们在《大模型入门实战》章节有讲过 FunctionCall,该项目我们将使用其实现。

这里只实现三个功能:

  • 生成 YAML 清单并部署
  • 查询 K8s 资源
  • 删除 K8s 资源

chatgpt.go 中实现。

首先,定义一个工具函数 createTools,用于定义 openai 的工具集。

go
func createTools() []openai.Tool {
	// 用来生成 K8s YAML 并部署资源
	f1 := openai.FunctionDefinition{
		Name:        "generateAndDeployResource",
		Description: "生成 K8s YAML 并部署资源",
		Parameters: jsonschema.Definition{
			Type: jsonschema.Object,
			Properties: map[string]jsonschema.Definition{
				"user_input": {
					Type:        jsonschema.String,
					Description: "用户输出的文本内容,要求包含资源类型和镜像",
				},
			},
			Required: []string{"user_input"}, // 修复:应该是 user_input 而不是 location
		},
	}

	// 用来查询 K8s 资源
	f2 := openai.FunctionDefinition{
		Name:        "queryResource",
		Description: "查询 K8s 资源",
		Parameters: jsonschema.Definition{
			Type: jsonschema.Object,
			Properties: map[string]jsonschema.Definition{
				"namespace": {
					Type:        jsonschema.String,
					Description: "资源所在的命名空间",
				},
				"resource_type": {
					Type:        jsonschema.String,
					Description: "K8s 资源标准类型,例如 Pod、Deployment、Service 等",
				},
			},
			Required: []string{"namespace", "resource_type"}, // 添加必需参数
		},
	}

	// 用来删除 K8s 资源
	f3 := openai.FunctionDefinition{
		Name:        "deleteResource",
		Description: "删除 K8s 资源",
		Parameters: jsonschema.Definition{
			Type: jsonschema.Object,
			Properties: map[string]jsonschema.Definition{
				"namespace": {
					Type:        jsonschema.String,
					Description: "资源所在的命名空间",
				},
				"resource_type": {
					Type:        jsonschema.String,
					Description: "K8s 资源标准类型,例如 Pod、Deployment、Service 等",
				},
				"resource_name": {
					Type:        jsonschema.String,
					Description: "资源名称",
				},
			},
			Required: []string{"namespace", "resource_type", "resource_name"}, // 添加必需参数
		},
	}

	return []openai.Tool{
		{Type: openai.ToolTypeFunction, Function: &f1},
		{Type: openai.ToolTypeFunction, Function: &f2},
		{Type: openai.ToolTypeFunction, Function: &f3},
	}
}

接着,我们定义 functionCalling 方法,用于将工具注册到大模型以及执行工具调用。

go
func functionCalling(input string, client *utils.OpenAI) string {
	tools := createTools()

	dialogue := []openai.ChatCompletionMessage{
		{Role: openai.ChatMessageRoleUser, Content: input},
	}

	// 调用 OpenAI API
	resp, err := client.Client.CreateChatCompletion(context.TODO(),
		openai.ChatCompletionRequest{
			Model:    openai.GPT4TurboPreview,
			Messages: dialogue,
			Tools:    tools,
		},
	)
	if err != nil {
		return fmt.Sprintf("OpenAI API 调用失败: %v", err)
	}

	// 验证响应
	if len(resp.Choices) == 0 {
		return "OpenAI 返回了空的响应"
	}

	msg := resp.Choices[0].Message

	// 如果没有工具调用,直接返回消息内容
	if len(msg.ToolCalls) == 0 {
		if msg.Content != "" {
			return msg.Content
		}
		return "抱歉,我无法理解您的请求,请提供更具体的信息。"
	}

	// 处理多个工具调用(虽然当前逻辑假设只有一个)
	if len(msg.ToolCalls) > 1 {
		return "抱歉,当前不支持同时执行多个操作,请一次只执行一个操作。"
	}

	// 执行工具调用
	toolCall := msg.ToolCalls[0]
	result, err := callFunction(client, toolCall.Function.Name, toolCall.Function.Arguments)
	if err != nil {
		return fmt.Sprintf("执行操作失败: %v", err)
	}
	return result
}

functionCalling 中,我们调用了 callFunction 方法执行工具调用,下面实现 callFuntion ,它将根据 OpenAI 返回的消息,调用对应的函数。

go
// 根据 OpenAI 返回的消息,调用对应的函数
func callFunction(client *utils.OpenAI, name, arguments string) (string, error) {
	switch name {
	case "generateAndDeployResource":
		params := struct {
			UserInput string `json:"user_input"`
		}{}
		if err := json.Unmarshal([]byte(arguments), &params); err != nil {
			return "", fmt.Errorf("解析生成部署资源参数失败: %v", err)
		}
		return generateAndDeployResource(client, params.UserInput)

	case "queryResource":
		params := struct {
			Namespace    string `json:"namespace"`
			ResourceType string `json:"resource_type"`
		}{}
		if err := json.Unmarshal([]byte(arguments), &params); err != nil {
			return "", fmt.Errorf("解析查询资源参数失败: %v", err)
		}
		return queryResource(params.Namespace, params.ResourceType)

	case "deleteResource":
		params := struct {
			Namespace    string `json:"namespace"`
			ResourceType string `json:"resource_type"`
			ResourceName string `json:"resource_name"`
		}{}
		if err := json.Unmarshal([]byte(arguments), &params); err != nil {
			return "", fmt.Errorf("解析删除资源参数失败: %v", err)
		}
		return deleteResource(params.Namespace, params.ResourceType, params.ResourceName)

	default:
		return "", fmt.Errorf("未知的函数: %s", name)
	}
}

接下来,我们需要实现具体的操作方法,它们分别是:

  • generateAndDeployResource
  • queryResource
  • deleteResource

在这之前,我们需要在 utils 中封装 client-go,方便后续调用。

go
// utils/client_go.go
package utils

import (
	"fmt"
	"path/filepath"
	"strings"

	"k8s.io/client-go/discovery"
	"k8s.io/client-go/dynamic"
	"k8s.io/client-go/kubernetes"
	"k8s.io/client-go/tools/clientcmd"
	"k8s.io/client-go/util/homedir"
)

// ClientGo encapsulates both clientset and dynamicClient for Kubernetes operations
type ClientGo struct {
	Clientset       *kubernetes.Clientset
	DynamicClient   dynamic.Interface
	DiscoveryClient discovery.DiscoveryInterface
}

// NewClientGo initializes a new ClientGo instance with the provided kubeconfig path
func NewClientGo(kubeconfig string) (*ClientGo, error) {
	// Handle ~ in the kubeconfig path
	if strings.HasPrefix(kubeconfig, "~") {
		homeDir := homedir.HomeDir()
		kubeconfig = filepath.Join(homeDir, kubeconfig[1:])
	}

	// Build the configuration from the kubeconfig file
	config, err := clientcmd.BuildConfigFromFlags("", kubeconfig)
	if err != nil {
		return nil, fmt.Errorf("failed to build kubeconfig: %w", err)
	}

	// Create the clientset
	clientset, err := kubernetes.NewForConfig(config)
	if err != nil {
		return nil, fmt.Errorf("failed to create clientset: %w", err)
	}

	// Create the dynamic client
	dynamicClient, err := dynamic.NewForConfig(config)
	if err != nil {
		return nil, fmt.Errorf("failed to create dynamic client: %w", err)
	}

	// Create DiscoveryClient
	discoveryClient, err := discovery.NewDiscoveryClientForConfig(config)
	if err != nil {
		return nil, fmt.Errorf("failed to create discovery client: %w", err)
	}

	return &ClientGo{
		Clientset:       clientset,
		DynamicClient:   dynamicClient,
		DiscoveryClient: discoveryClient,
	}, nil
}

然后,我们再来实现具体的方法。

其中,generateAndDeployResource 是用来生成 YAML 清单以及部署资源的,其逻辑就是调用大模型生成纯净的 K8s YAML 资源,然后使用 Client-go 完成资源部署。

go
// 生成 K8s YAML 并部署资源
func generateAndDeployResource(client *utils.OpenAI, userInput string) (string, error) {
	// 生成 YAML 内容
	yamlContent, err := client.SendMessage("你现在是一个 K8s 资源生成器,根据用户输入生成 K8s YAML,注意除了 YAML 内容以外不要输出任何内容,此外不要把 YAML 放在 ``` 代码快里", userInput)
	if err != nil {
		return "", fmt.Errorf("生成 YAML 失败: %v", err)
	}

	// 创建 Kubernetes 客户端
	clientGo, err := utils.NewClientGo(kubeconfig) // kubeconfig 是一个全局 Flag
	if err != nil {
		return "", fmt.Errorf("创建 Kubernetes 客户端失败: %v", err)
	}

	// 获取 API 资源
	resources, err := restmapper.GetAPIGroupResources(clientGo.DiscoveryClient)
	if err != nil {
		return "", fmt.Errorf("获取 API 资源失败: %v", err)
	}

	// 将 YAML 转成 unstructured 对象
	unstructuredObj := &unstructured.Unstructured{}
	_, _, err = scheme.Codecs.UniversalDeserializer().Decode([]byte(yamlContent), nil, unstructuredObj)
	if err != nil {
		return "", fmt.Errorf("解析 YAML 失败: %v", err)
	}

	// 创建 REST mapper
	mapper := restmapper.NewDiscoveryRESTMapper(resources)
	// 从 unstructuredObj 中提取 GVK
	gvk := unstructuredObj.GroupVersionKind()
	// 用 GVK 转 GVR
	mapping, err := mapper.RESTMapping(gvk.GroupKind(), gvk.Version)
	if err != nil {
		return "", fmt.Errorf("映射资源类型失败: %v", err)
	}

	// 设置默认命名空间
	namespace := unstructuredObj.GetNamespace()
	if namespace == "" {
		namespace = "default"
	}

	// 部署资源
	_, err = clientGo.DynamicClient.Resource(mapping.Resource).Namespace(namespace).Create(context.TODO(), unstructuredObj, metav1.CreateOptions{})
	if err != nil {
		return "", fmt.Errorf("部署资源失败: %v", err)
	}

	resourceName := unstructuredObj.GetName()
	resourceKind := unstructuredObj.GetKind()
	return fmt.Sprintf("✅ 成功部署 %s/%s 到命名空间 %s\n\n生成的 YAML:\n%s", resourceKind, resourceName, namespace, yamlContent), nil
}

对于 queryResource 是用来查询资源信息的,不同的资源信息有不同的 GVR,如下:

go
// 查询 K8s 资源
func queryResource(namespace, resourceType string) (string, error) {
	// 创建 Kubernetes 客户端
	clientGo, err := utils.NewClientGo(kubeconfig)
	if err != nil {
		return "", fmt.Errorf("创建 Kubernetes 客户端失败: %v", err)
	}

	// 标准化资源类型
	resourceType = strings.ToLower(resourceType)
	var gvr schema.GroupVersionResource
	var displayName string

	switch resourceType {
	case "deployment", "deployments":
		gvr = schema.GroupVersionResource{Group: "apps", Version: "v1", Resource: "deployments"}
		displayName = "Deployment"
	case "service", "services", "svc":
		gvr = schema.GroupVersionResource{Group: "", Version: "v1", Resource: "services"}
		displayName = "Service"
	case "pod", "pods":
		gvr = schema.GroupVersionResource{Group: "", Version: "v1", Resource: "pods"}
		displayName = "Pod"
	case "configmap", "configmaps", "cm":
		gvr = schema.GroupVersionResource{Group: "", Version: "v1", Resource: "configmaps"}
		displayName = "ConfigMap"
	case "secret", "secrets":
		gvr = schema.GroupVersionResource{Group: "", Version: "v1", Resource: "secrets"}
		displayName = "Secret"
	default:
		return "", fmt.Errorf("不支持的资源类型: %s。支持的类型: deployment, service, pod, configmap, secret", resourceType)
	}

	// 查询资源
	resourceList, err := clientGo.DynamicClient.Resource(gvr).Namespace(namespace).List(context.TODO(), metav1.ListOptions{})
	if err != nil {
		return "", fmt.Errorf("查询资源失败: %v", err)
	}

	// 格式化输出结果
	if len(resourceList.Items) == 0 {
		return fmt.Sprintf("在命名空间 '%s' 中未找到任何 %s 资源", namespace, displayName), nil
	}

	result := fmt.Sprintf("在命名空间 '%s' 中找到 %d%s 资源:\n\n", namespace, len(resourceList.Items), displayName)
	for i, item := range resourceList.Items {
		result += fmt.Sprintf("%d. %s\n", i+1, item.GetName())

		// 添加一些额外信息
		if creationTime := item.GetCreationTimestamp(); !creationTime.IsZero() {
			result += fmt.Sprintf("   创建时间: %s\n", creationTime.Format("2006-01-02 15:04:05"))
		}
		result += "\n"
	}

	return result, nil
}

最后,deleteResource 是用来删除资源信息的,和 queryResource 逻辑差不多,如下:

go
// 删除 K8s 资源
func deleteResource(namespace, resourceType, resourceName string) (string, error) {
	// 创建 Kubernetes 客户端
	clientGo, err := utils.NewClientGo(kubeconfig)
	if err != nil {
		return "", fmt.Errorf("创建 Kubernetes 客户端失败: %v", err)
	}

	// 标准化资源类型
	resourceType = strings.ToLower(resourceType)
	var gvr schema.GroupVersionResource
	var displayName string

	switch resourceType {
	case "deployment", "deployments":
		gvr = schema.GroupVersionResource{Group: "apps", Version: "v1", Resource: "deployments"}
		displayName = "Deployment"
	case "service", "services", "svc":
		gvr = schema.GroupVersionResource{Group: "", Version: "v1", Resource: "services"}
		displayName = "Service"
	case "pod", "pods":
		gvr = schema.GroupVersionResource{Group: "", Version: "v1", Resource: "pods"}
		displayName = "Pod"
	case "configmap", "configmaps", "cm":
		gvr = schema.GroupVersionResource{Group: "", Version: "v1", Resource: "configmaps"}
		displayName = "ConfigMap"
	case "secret", "secrets":
		gvr = schema.GroupVersionResource{Group: "", Version: "v1", Resource: "secrets"}
		displayName = "Secret"
	default:
		return "", fmt.Errorf("不支持的资源类型: %s。支持的类型: deployment, service, pod, configmap, secret", resourceType)
	}

	// 检查资源是否存在
	_, err = clientGo.DynamicClient.Resource(gvr).Namespace(namespace).Get(context.TODO(), resourceName, metav1.GetOptions{})
	if err != nil {
		return "", fmt.Errorf("资源 %s/%s 在命名空间 '%s' 中不存在: %v", displayName, resourceName, namespace, err)
	}

	// 删除资源
	err = clientGo.DynamicClient.Resource(gvr).Namespace(namespace).Delete(context.TODO(), resourceName, metav1.DeleteOptions{})
	if err != nil {
		return "", fmt.Errorf("删除资源失败: %v", err)
	}

	return fmt.Sprintf("成功删除 %s/%s 从命名空间 '%s'", displayName, resourceName, namespace), nil
}

(4)我们将 functionCallstartChat 结合起来。

首先,增加一个 processInput 方法,主要用来初始化 openai,并将用户输入传递给大模型。

go
func processInput(input string) string {
	// 1. 初始化 openai
	client, err := utils.NewOpenAIClient()
	if err != nil {
		return err.Error()
	}
	// 2. 封装 utils/openai.go,调用 OpenAI API 得到回复
	// response, err := client.SendMessage("你好", input)

	// 3. 调用 OpenAI Function calling
	response := functionCalling(input, client)
	return response
}

最后,在 startChat 中调用 processInput 即可。

go
func startChat() {
	scanner := bufio.NewScanner(os.Stdin)
	fmt.Println("我是 K8S Copilot,你可以向我咨询 K8S 相关问题")

	for {
		fmt.Print("> ")
		if scanner.Scan() {
			input := scanner.Text()
			if input == "exit" {
				fmt.Println("退出对话")
				break
			}
			if input == "" {
				continue
			}
			response := processInput(input)
			fmt.Println(response)
		}
	}
}

(5)执行效果如下

afeac18b0fefb8fd12f032d165061797 MD5

总结

本文探讨了如何将人工智能技术与 Kubernetes 运维实践相结合,成功构建了一个名为 k8scopilot 的智能命令行助手。通过整合 client-goCobraOpenAIFunction Calling 等关键技术,我们实现了一个能够理解自然语言指令并执行相应 Kubernetes 操作的 AI Agent。

我们首先介绍了与 Kubernetes 集群交互的核心工具 client-go,它是所有自动化操作的基石。接着,我们利用 Cobra 库高效地构建了结构清晰、易于扩展的命令行界面。最后,通过 OpenAI 的大语言模型,特别是其 Function Calling 能力,我们将模糊的自然语言转换为精确的、可编程的 API 调用,实现了从“人理解机器命令”到“机器理解人”的范式转变。

最近更新