Skip to content
0

开发 Operator 调度 GPU 实例资源池 原创

✍ 道路千万条,安全第一条。操作不规范,运维两行泪。

最近在学习《AIOps》相关的知识课程,为了让学习有一定的收获,所以将其进行了总结分享,如果你恰好也需要,很荣幸能帮到你。

前面我们介绍了《开发K8s Chat 命令行工具》和《开发 K8s GPT 故障诊断工具》两篇和 K8s 相关的文章,本篇文章我们将把 K8s、AI、云 三者结合起来,开发一个 AI 工具。

本章节将引入一个新的概念——K8s Operator,它是 K8s 的一种扩展形式,可以帮助用户以 K8s 声明式 API 的方式管理应用及服务,Operator 定义了一组在 Kubernetes 集群中打包和部署复杂业务应用的方法,主要是为解决特定应用或服务关于如何运行、部署及出现问题时如何处理提供的一种特定的自定义方式。比如:

  • 按需部署应用服务
  • 实现应用状态的备份和还原,完成版本升级
  • 数据库 schema 或额外的配置设置的改动

在 K8s 中我们使用的 Deployment、Daemonset、Statefulset 等这些都是 K8s 的资源,这些资源的创建、删除、更新等动作都会被称为事件,K8s 的 Controller Manager 负责事件的监听,并触发对应的动作来满足期望,这种方式就是声明式,即用户只需要关心应用程序的最终状态。当我们在使用中发现有些资源并不能满足日常的需求,对于这类需求可以使用 K8s 的自定义资源和 Operator 为应用程序提供基于 K8s 的扩展。

在这其中,CRD 就是对自定义资源的描述,如果要自定义资源,就需要先定义好 CRD,也就是介绍这个资源有什么属性,这些属性的类型、结构是怎样的。

比如 PG 的 Operator 如下:

yaml
apiVersion: apiextensions.k8s.io/v1beta1
kind: CustomResourceDefinition
metadata:
  name: postgresqls.acid.zalan.do
  labels:
    app.kubernetes.io/name: postgres-operator
  annotations:
    "helm.sh/hook": crd-install
spec:
  group: acid.zalan.do
  names:
    kind: postgresql
    listKind: postgresqlList
    plural: postgresqls
    singular: postgresql
    shortNames:
    - pg  additionalPrinterColumns:
  - name: Team
    type: string
    description: Team responsible for Postgres CLuster
    JSONPath: .spec.teamId
  - name: Version
    type: string
    description: PostgreSQL version
    JSONPath: .spec.postgresql.version
  - name: Pods
    type: integer
    description: Number of Pods per Postgres cluster
    JSONPath: .spec.numberOfInstances
  - name: Volume
    type: string
    description: Size of the bound volume
    JSONPath: .spec.volume.size

CRD 主要包括 apiVersion、kind、metadata 和 spec 四个部分。其中最关键的是 apiVersion 和 kind,apiVersion 表示资源所属组织和版本,apiVersion 一般由 APIGourp 和 Version 组成,这里的 APIGourp 是http://apiextensions.k8s.io,Version 是 v1beta1,相关信息可以通过kubectl api-resoures查看。kind 表示资源类型,这里是CustomResourceDefinition,表示是一个自定义的资源描述。

本文我们将自己开发一个 Operator 来维护 GPU 资源池的稳定,解决 AI 模型训练的基础平台的稳定性。其架构如下:

ee11ee9bb3ba2f232c0f78573956823f MD5

其中:

  • GPU 资源池采用的是腾讯云的竞价 GPU 实例
  • Operator 运行在 K8s 中,通过 SpootPool 控制 GPU 资源池的数量
  • 若云平台释放了某台 GPU 实例,当 Operator 监听到资源池数量和期望的不匹配,会自动补充到期望数量

Operator 的开发有多种脚手架,常用的有 operator-sdk、kubebuilder 等,这里我们将使用 kubebuilder 来完成 Operator 的开发。

前置条件

  • 准备一个可用的 K8s 集群,可以使用 kind、kubeadm、二进制等各种形式安装,如果使用 kubeadm 安装集群,可以参考 Kubernetes集群管理
  • 安装好 kubebuilder,可以参考 kubebuild快速安装
  • 准备好云平台的 AK,这里是采用腾讯云,其他云类似。

快速开始

1、设计 CRD

在开发之前需要先设计好 CRD(就像业务开发前先设计好表结构一样),本文的 CRD 主要包含云平台虚拟机的开通,包括最小和最大实例数,以及腾讯云 SDK 所需要的各种参数,比如地域、可用区、VPC、子网、安全组、镜像等。

最后 CRD 设计如下:

yaml
apiVersion: devops.jokerbai.com/v1
kind: Spotpool
metadata:
  labels:
    app.kubernetes.io/name: spotpool
    app.kubernetes.io/managed-by: kustomize
  name: spotpool-sample
spec:
  secretId: 密钥ID
  secretKey: 密钥Key
  region: 区域
  availabilityZone: 可用区
  instanceType: 实例类型
  minimum: 最小实例数
  maximum: 最大实例数
  subnetId: 子网ID
  vpcId: VPC ID
  securityGroupIds:
    - 安全组
  imageId: 镜像ID
  instanceChargeType: 实例付费类型

2、初始化项目

定义好 CRD 字段之后,我们先使用 kubebuilder 初始化一个 Operator 项目,命令如下:

(1)初始化项目

bash
mkdir spotpool && cd spotpool
kubebuilder init \
  --domain jokerbai.com \
  --repo github.com/joker-bai/spotpool \
  --project-name spotpool \
  --plugins go/v4 \
  --owner "Joker Bai"

(2)创建 API

bash
kubebuilder create api --group devops.jokerbai.com --version v1 --kind Spotpool

(3)生成后的目录结构大致如下

bash
.
├── api
│   └── v1
│       ├── groupversion_info.go
│       ├── spotpool_types.go
│       └── zz_generated.deepcopy.go
├── bin
│   ├── controller-gen -> /root/workspace/godev/src/github.com/joker-bai/spotpool/bin/controller-gen-v0.18.0
│   └── controller-gen-v0.18.0
├── cmd
│   └── main.go
├── config
│   ├── crd
│   │   ├── kustomization.yaml
│   │   └── kustomizeconfig.yaml
│   ├── default
│   │   ├── cert_metrics_manager_patch.yaml
│   │   ├── kustomization.yaml
│   │   ├── manager_metrics_patch.yaml
│   │   └── metrics_service.yaml
│   ├── manager
│   │   ├── kustomization.yaml
│   │   └── manager.yaml
│   ├── network-policy
│   │   ├── allow-metrics-traffic.yaml
│   │   └── kustomization.yaml
│   ├── prometheus
│   │   ├── kustomization.yaml
│   │   ├── monitor_tls_patch.yaml
│   │   └── monitor.yaml
│   ├── rbac
│   │   ├── kustomization.yaml
│   │   ├── leader_election_role_binding.yaml
│   │   ├── leader_election_role.yaml
│   │   ├── metrics_auth_role_binding.yaml
│   │   ├── metrics_auth_role.yaml
│   │   ├── metrics_reader_role.yaml
│   │   ├── role_binding.yaml
│   │   ├── role.yaml
│   │   ├── service_account.yaml
│   │   ├── spotpool_admin_role.yaml
│   │   ├── spotpool_editor_role.yaml
│   │   └── spotpool_viewer_role.yaml
│   └── samples
│       ├── devops.jokerbai.com_v1_spotpool.yaml
│       └── kustomization.yaml
├── Dockerfile
├── go.mod
├── go.sum
├── hack
│   └── boilerplate.go.txt
├── internal
│   └── controller
│       ├── spotpool_controller.go
│       ├── spotpool_controller_test.go
│       └── suite_test.go
├── Makefile
├── PROJECT
├── README.md
└── test
    ├── e2e
    │   ├── e2e_suite_test.go
    │   └── e2e_test.go
    └── utils
        └── utils.go

3、CRD 开发

(1)定义 API

api/v1alpha1/spotpool_types.go中定义 CRD 的结构体,如下:

go
package v1

import (
	metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
)

// EDIT THIS FILE!  THIS IS SCAFFOLDING FOR YOU TO OWN!
// NOTE: json tags are required.  Any new fields you add must have json tags for the fields to be serialized.

// SpotpoolSpec defines the desired state of Spotpool
type SpotpoolSpec struct {
	// INSERT ADDITIONAL SPEC FIELDS - desired state of cluster
	// Important: Run "make" to regenerate code after modifying this file
	SecretId           string   `json:"secretId,omitempty"`
	SecretKey          string   `json:"secretKey,omitempty"`
	Region             string   `json:"region,omitempty"`
	AvaliableZone      string   `json:"availabilityZone,omitempty"`
	InstanceType       string   `json:"instanceType,omitempty"`
	SubnetId           string   `json:"subnetId,omitempty"`
	VpcId              string   `json:"vpcId,omitempty"`
	SecurityGroupId    []string `json:"securityGroupIds,omitempty"`
	ImageId            string   `json:"imageId,omitempty"`
	InstanceChargeType string   `json:"instanceChargeType,omitempty"`
	Minimum            int32    `json:"minimum,omitempty"`
	Maximum            int32    `json:"maximum,omitempty"`
}

// SpotpoolStatus defines the observed state of Spotpool
type SpotpoolStatus struct {
	// INSERT ADDITIONAL STATUS FIELD - define observed state of cluster
	// Important: Run "make" to regenerate code after modifying this file
	Size       int32              `json:"size,omitempty"`
	Conditions []metav1.Condition `json:"conditions,omitempty" patchStrategy:"merge" patchMergeKey:"type" protobuf:"bytes,rep,name=conditions"`
	Instances  []Instances        `json:"instances,omitempty"`
}

type Instances struct {
	InstanceId string `json:"instanceId,omitempty"`
	PublicIp   string `json:"publicIp,omitempty"`
}

//+kubebuilder:object:root=true
//+kubebuilder:subresource:status

// Spotpool is the Schema for the spotpools API
type Spotpool struct {
	metav1.TypeMeta   `json:",inline"`
	metav1.ObjectMeta `json:"metadata,omitempty"`

	Spec   SpotpoolSpec   `json:"spec,omitempty"`
	Status SpotpoolStatus `json:"status,omitempty"`
}

//+kubebuilder:object:root=true

// SpotpoolList contains a list of Spotpool
type SpotpoolList struct {
	metav1.TypeMeta `json:",inline"`
	metav1.ListMeta `json:"metadata,omitempty"`
	Items           []Spotpool `json:"items"`
}

func init() {
	SchemeBuilder.Register(&Spotpool{}, &SpotpoolList{})
}

SpotpoolSpec 中定义设计的 CRD 结构体,这些字段都是创建虚拟机的必要字段。另外,在 SpotpoolStatus 中定义返回状态里的信息,这里只需要 Instance 相关的信息。

(2)生成代码

API 相关的代码开发完后,执行以下命令生成代码:

bash
make generate
make manifests

4、Controller 开发

(1)开发控制器逻辑

控制器的主逻辑是:

  • 从云平台获取运行的实例数
  • 判断实例数和期望的实例数是否相等
    • 如果小于期望值,则创建实例
    • 如果大于期望值,则删除实例

所以主逻辑的代码如下,修改internal/controller/spotpool_controller.go

go
func (r *SpotpoolReconciler) Reconcile(ctx context.Context, req ctrl.Request) (ctrl.Result, error) {
	log := logf.FromContext(ctx)

	// 获取用户期望
	spotpool := &devopsjokerbaicomv1.Spotpool{}
	if err := r.Get(ctx, req.NamespacedName, spotpool); err != nil {
		log.Error(err, "unable to fetch spotspool")
	}

	// 从云平台获取获取运行的实例
	runningVmList, err := r.getRunningInstanceIds(spotpool)
	if err != nil {
		log.Error(err, "get running vm instance failed")
		// 十秒后重试
		return ctrl.Result{RequeueAfter: 10 * time.Second}, nil
	}

	runningCount := len(runningVmList)

	switch {
	case runningCount < int(spotpool.Spec.Minimum):
		// 创建实例扩容
		delta := spotpool.Spec.Minimum - int32(runningCount)
		log.Info("creating instances", "delta", delta)
		err = r.runInstances(spotpool, delta)
		if err != nil {
			log.Error(err, "unable to create instances")
			return ctrl.Result{RequeueAfter: 40 * time.Second}, nil
		}
	case runningCount > int(spotpool.Spec.Maximum):
		// 删除实例缩容
		delta := int32(runningCount) - spotpool.Spec.Maximum
		log.Info("terminating instances", "delta", delta)
		err = r.terminateInstances(spotpool, delta)
		if err != nil {
			log.Error(err, "unable to terminate instances")
			return ctrl.Result{RequeueAfter: 40 * time.Second}, nil
		}
	}

	return ctrl.Result{RequeueAfter: 40 * time.Second}, nil
}

其中:

  • r.getRunningInstanceIds(spotpool) 用户获取云平台运行的实例数
  • r.runInstances(spotpool, delta) 用于调用云平台进行扩容
  • r.terminateInstances(spotpool, delta) 用于调用云平台进行缩容

接下来分别实现上面的三个方法。

(1)首先,实现 getRunningInstanceIds 方法

go
func (r *SpotpoolReconciler) getRunningInstanceIds(spotpool *devopsjokerbaicomv1.Spotpool) ([]string, error) {
	client, err := r.createCVMClient(spotpool.Spec)
	if err != nil {
		return nil, err
	}

	request := cvm.NewDescribeInstancesRequest()
	response, err := client.DescribeInstances(request)
	if err != nil {
		return nil, err
	}
	var instances []devopsjokerbaicomv1.Instances
	var runningInstanceIDs []string
	for _, instance := range response.Response.InstanceSet {
		if *instance.InstanceState == "RUNNING" || *instance.InstanceState == "PENDING" || *instance.InstanceState == "STARTING" {
			runningInstanceIDs = append(runningInstanceIDs, *instance.InstanceId)
		}
		// 检查实例的公网 IP,如果不存在公网 IP,则继续重试
		if len(instance.PublicIpAddresses) == 0 {
			return nil, fmt.Errorf("instance %s does not have public ip", *instance.InstanceId)
		}
		instances = append(instances, devopsjokerbaicomv1.Instances{
			InstanceId: *instance.InstanceId,
			PublicIp:   *instance.PublicIpAddresses[0],
		})
	}
	// 更新 status
	spotpool.Status.Instances = instances
	err = r.Status().Update(context.Background(), spotpool)
	if err != nil {
		return nil, err
	}
	return runningInstanceIDs, nil
}

// 获取腾讯云 SDK client
func (r *SpotpoolReconciler) createCVMClient(spec devopsjokerbaicomv1.SpotpoolSpec) (*cvm.Client, error) {
	credential := common.NewCredential(spec.SecretId, spec.SecretKey)
	cpf := profile.NewClientProfile()
	cpf.HttpProfile.ReqMethod = "POST"
	cpf.HttpProfile.ReqTimeout = 30
	cpf.SignMethod = "HmacSHA1"

	client, err := cvm.NewClient(credential, spec.Region, cpf)
	if err != nil {
		return nil, err
	}
	return client, nil
}

其中:

  • 调用 r.createCVMClient(spotpool.Spec) 获取腾讯云SDK client
  • 然后调用 client.DescribeInstances(request) 获取实例详细信息
  • 最后通过判断 instance.InstanceStatinstance.PublicIpAddresses 的状态信息决定是否是需要的实例
  • 最后返回实例列表信息

(2)实现 r.runInstances(spotpool, delta) 用于调用云平台进行扩容

go
func (r *SpotpoolReconciler) runInstances(spotpool *devopsjokerbaicomv1.Spotpool, count int32) error {
	client, err := r.createCVMClient(spotpool.Spec)
	if err != nil {
		return err
	}
	request := cvm.NewRunInstancesRequest()
	request.ImageId = common.StringPtr(spotpool.Spec.ImageId)
	request.Placement = &cvm.Placement{
		Zone: common.StringPtr(spotpool.Spec.AvaliableZone),
	}
	request.InstanceChargeType = common.StringPtr(spotpool.Spec.InstanceChargeType)
	request.InstanceCount = common.Int64Ptr(int64(count))
	request.InstanceName = common.StringPtr("spotpool" + time.Now().Format("20060102150405"))
	request.InstanceType = common.StringPtr(spotpool.Spec.InstanceType)
	request.InternetAccessible = &cvm.InternetAccessible{
		InternetChargeType:      common.StringPtr("BANDWIDTH_POSTPAID_BY_HOUR"),
		InternetMaxBandwidthOut: common.Int64Ptr(1),
		PublicIpAssigned:        common.BoolPtr(true),
	}
	request.LoginSettings = &cvm.LoginSettings{
		Password: common.StringPtr("Password123"),
	}
	request.SecurityGroupIds = common.StringPtrs(spotpool.Spec.SecurityGroupId)
	request.SystemDisk = &cvm.SystemDisk{
		DiskType: common.StringPtr("CLOUD_BSSD"),
		DiskSize: common.Int64Ptr(100),
	}
	request.VirtualPrivateCloud = &cvm.VirtualPrivateCloud{
		SubnetId: common.StringPtr(spotpool.Spec.SubnetId),
		VpcId:    common.StringPtr(spotpool.Spec.VpcId),
	}

	// print request
	fmt.Println(request.ToJsonString())

	// 创建实例
	response, err := client.RunInstances(request)
	if _, ok := err.(*errors.TencentCloudSDKError); ok {
		return err
	}
	// other errors
	if err != nil {
		return err
	}

	// 获取到返回的 instancesid
	instanceIds := make([]string, 0, len(response.Response.InstanceIdSet))
	for _, instanceId := range response.Response.InstanceIdSet {
		instanceIds = append(instanceIds, *instanceId)
	}

	fmt.Println("run instances success", instanceIds)
	// 更新 status
	_, err = r.getRunningInstanceIds(spotpool)
	if err != nil {
		return err
	}
	return nil
}

这个方法主要是调用 client.RunInstances(request) 进行实例创建,然后调用 r.getRunningInstanceIds(spotpool) 更新 status 的状态信息。

(3)开发r.terminateInstances(spotpool, delta) 用于调用云平台进行缩容

go
func (r *SpotpoolReconciler) terminateInstances(spotpool *devopsjokerbaicomv1.Spotpool, count int32) error {
	client, err := r.createCVMClient(spotpool.Spec)
	if err != nil {
		return err
	}

	runningInstances, err := r.getRunningInstanceIds(spotpool)
	if err != nil {
		return err
	}

	instancesIds := runningInstances[:count]
	request := cvm.NewTerminateInstancesRequest()
	request.InstanceIds = common.StringPtrs(instancesIds)

	// 获取返回
	response, err := client.TerminateInstances(request)
	if _, ok := err.(*errors.TencentCloudSDKError); ok {
		return err
	}
	// other errors
	if err != nil {
		return err
	}

	fmt.Println("Terminate response: ", response)
	fmt.Println("terminate instances success", instancesIds)

	// 更新 status
	_, err = r.getRunningInstanceIds(spotpool)
	if err != nil {
		return err
	}
	return nil
}

删除实例和创建实例的实现逻辑类似,先调用 client.TerminateInstances(request) 进行删除,然后调用 r.getRunningInstanceIds(spotpool) 更新状态。

上面三个步骤完成了主要逻辑开发,可以初步实现具体的效果,如果希望功能更健全,则需要对其进行开发优化。

部署和测试

1、本地测试

bash
# 安装 CRD
make install
# 运行 controller
make run

2、创建 Spotpool 实例测试

(1)创建 Spotpool 资源清单,编辑 config/samples/devops.jokerbai.com_v1_spotpool.yaml

yaml
apiVersion: devops.jokerbai.com.jokerbai.com/v1
kind: Spotpool
metadata:
  labels:
    app.kubernetes.io/name: spotpool
    app.kubernetes.io/managed-by: kustomize
  name: spotpool-sample
spec:
  secretId: xxx
  secretKey: xxx
  region: ap-singapore
  availabilityZone: ap-singapore-2
  instanceType: "GN7.2XLARGE32"
  minimum: 2
  maximum: 2
  subnetId: DEFAULT
  vpcId: DEFAULT
  securityGroupIds:
    - sg-xxx
  imageId: img-xxx
  instanceChargeType: SPOTPAID

(2)运行资源清单

bash
# 创建实例
kubectl apply -f config/samples/devops.jokerbai.com_v1_spotpool.yaml

# 查看状态
kubectl get spotpool

(3)构建并部署到集群

bash
# 构建镜像
make docker-build docker-push IMG=<your-registry>/spotpool:v1

# 部署到集群
make deploy IMG=<your-registry>/spotpool:v1

(4)清理

bash
# 删除 operator
make undeploy

# 删除 CRD
make uninstall

最后

本文通过结合 Kubernetes、AI 和云平台,深入探讨了如何利用 K8s Operator 实现对 GPU 资源池的自动化管理。我们从 Operator 的核心概念出发,介绍了 CRD(自定义资源定义)和控制器的设计原理,并基于 kubebuilder 开发了一个名为 Spotpool 的 Operator,用于在腾讯云上维护竞价实例的稳定运行。

整个开发过程遵循“声明式 API”的思想,用户只需定义期望的状态(如最小/最大实例数),Operator 便会在后台持续监控并自动调整实际状态,确保资源池始终符合预期。这不仅极大地简化了运维操作,也提升了 AI 模型训练平台的稳定性和弹性。

Operator 是云原生时代自动化运维的重要利器。掌握其开发方法,意味着我们不仅能“用好” Kubernetes,更能“扩展” Kubernetes,为复杂业务场景提供定制化的解决方案。

最近更新