Skip to content

Job管理 原创

1、功能

15c97eafdde6184a876a0f8a49c4f5b1 MD5

2、类型转换

2.1、实现类型转换

internal/pkg/k8s/job/common.go 文件中新增 JobCell,实现和 batchv1.JobCell 进行类型转换,如下:

go
package job

import (
	batchv1 "k8s.io/api/batch/v1"
	"time"

	"github.com/joker-bai/hawkeye/internal/pkg/k8s/dataselect"
)

type JobCell batchv1.Job

func (p JobCell) GetCreation() time.Time {
	return p.CreationTimestamp.Time
}

func (p JobCell) GetName() string {
	return p.Name
}

// toCells corev1.Pod 类型 转换成 DataCell 类型
// @description: Pod类型转换成DataCell
func toCells(sts []batchv1.Job) []dataselect.DataCell {
	cells := make([]dataselect.DataCell, len(sts))
	for i := range sts {
		cells[i] = JobCell(sts[i])
	}
	return cells
}

// fromCells DataCell 类型转换成 corev1.Pod 类型
// @description: DataCell类型转换成Pod
func fromCells(cells []dataselect.DataCell) []batchv1.Job {
	ds := make([]batchv1.Job, len(cells))
	for i := range cells {
		ds[i] = batchv1.Job(cells[i].(JobCell))
	}
	return ds
}

2.2、实现增删改查

(1)创建 internal/pkg/k8s/job/create.go 文件,输入以下内容,用于创建 job

go
package job

import (
	"context"
	"github.com/joker-bai/hawkeye/global"
	"github.com/joker-bai/hawkeye/internal/app/requests"
	"github.com/joker-bai/hawkeye/internal/pkg/k8s/common"
	batchv1 "k8s.io/api/batch/v1"
	corev1 "k8s.io/api/core/v1"
	"k8s.io/apimachinery/pkg/api/resource"
	metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
	"strings"
)

const (
	DescriptionAnnotationKey = "description"
)

func CreateJob(job *requests.K8sJobCreateRequest) error {
	annotations := map[string]string{}
	if job.Description != nil {
		annotations[DescriptionAnnotationKey] = *job.Description
	}

	labels := common.GetLabelsMap(job.Labels)

	metadata := metav1.ObjectMeta{
		Annotations: annotations,
		Labels:      labels,
		Name:        job.Name,
	}

	containerSpec := corev1.Container{
		Name:  job.Name,
		Image: job.ContainerImage,
		SecurityContext: &corev1.SecurityContext{
			Privileged: &job.RunAsPrivileged,
		},
		Resources: corev1.ResourceRequirements{Requests: make(map[corev1.ResourceName]resource.Quantity)},
		Env:       common.ConvertEnvVarsSpec(job.Variables),
	}

	if job.ContainerCommand != nil {
		containerSpec.Command = []string{*job.ContainerCommand}
	}
	if job.ContainerCommandArgs != nil {
		containerSpec.Args = strings.Fields(*job.ContainerCommandArgs)
	}
	if job.MemoryRequirement != nil {
		containerSpec.Resources.Requests[corev1.ResourceMemory] = resource.MustParse(*job.MemoryRequirement)
	}
	if job.CpuRequirement != nil {
		containerSpec.Resources.Requests[corev1.ResourceCPU] = resource.MustParse(*job.CpuRequirement)
	}

	// 是否开启Readiness健康检测
	if job.IsReadinessEnable {
		probeHandler := common.GetContainerProbe(job.ReadinessProbe)
		containerSpec.ReadinessProbe = &corev1.Probe{
			ProbeHandler:        probeHandler,
			InitialDelaySeconds: job.ReadinessProbe.InitialDelaySeconds,
			TimeoutSeconds:      job.ReadinessProbe.TimeoutSeconds,
			PeriodSeconds:       job.ReadinessProbe.PeriodSeconds,
			SuccessThreshold:    job.ReadinessProbe.SuccessThreshold,
			FailureThreshold:    job.ReadinessProbe.FailureThreshold,
		}
	}
	if job.IsLivenessEnable {
		probehandler := common.GetContainerProbe(job.ReadinessProbe)

		containerSpec.LivenessProbe = &corev1.Probe{
			ProbeHandler:        probehandler,
			InitialDelaySeconds: job.LivenessProbe.InitialDelaySeconds,
			TimeoutSeconds:      job.LivenessProbe.TimeoutSeconds,
			PeriodSeconds:       job.LivenessProbe.PeriodSeconds,
			SuccessThreshold:    job.LivenessProbe.SuccessThreshold,
			FailureThreshold:    job.LivenessProbe.FailureThreshold,
		}
	}

	podSpec := corev1.PodSpec{Containers: []corev1.Container{containerSpec}}

	if job.ImagePullSecret != nil {
		podSpec.ImagePullSecrets = []corev1.LocalObjectReference{{Name: *job.ImagePullSecret}}
	}

	podTemplate := corev1.PodTemplateSpec{
		ObjectMeta: metadata,
		Spec:       podSpec,
	}

	jobSpec := batchv1.JobSpec{
		Template: podTemplate,
		Selector: &metav1.LabelSelector{
			MatchLabels: labels,
		},
	}

	if job.ActiveDeadlineSeconds != nil {
		jobSpec.ActiveDeadlineSeconds = job.ActiveDeadlineSeconds
	}
	if job.Parallelism != nil {
		jobSpec.Parallelism = job.Parallelism
	}
	if job.BackoffLimit != nil {
		jobSpec.BackoffLimit = job.BackoffLimit
	}
	if job.Completions != nil {
		jobSpec.Completions = job.Completions
	}

	jobTemp := &batchv1.Job{
		ObjectMeta: metadata,
		Spec:       jobSpec,
	}

	if _, err := global.K8S.BatchV1().Jobs(job.Namespace).Create(context.TODO(), jobTemp, metav1.CreateOptions{}); err != nil {
		return err
	}

	// 如果需要创建Service,执行以下代码

	if job.IsCreateService {
		// TODO
	}

	return nil
}

(2)在 internal/pkg/k8s/job/delete.go 中创建以下内容,用户删除 job

go
package job

import (
	"context"
	"github.com/joker-bai/hawkeye/global"
	metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
)

func DeleteJob(name, namespace string) error {
	return global.K8S.BatchV1().Jobs(namespace).Delete(context.TODO(), name, metav1.DeleteOptions{})
}

(3)在 internal/pkg/k8s/job/list.go 中创建以下内容,用于列出 job 列表

go
package job

import (
	"context"
	"github.com/joker-bai/hawkeye/global"
	"github.com/joker-bai/hawkeye/internal/pkg/k8s/dataselect"
	batchv1 "k8s.io/api/batch/v1"
	metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
)

func ListJob(name, namespace string, page, limit int) ([]batchv1.Job, error) {
	jobs, err := global.K8S.BatchV1().Jobs(namespace).List(context.TODO(), metav1.ListOptions{})
	if err != nil {
		return nil, err
	}

	// 做排序
	selector := dataselect.DataSelector{
		GenericDataList: toCells(jobs.Items),
		DataSelectQuery: &dataselect.DataSelectQuery{
			Filter: &dataselect.FilterQuery{
				Name: name,
			},
			Paginate: &dataselect.PaginateQuery{
				Limit: limit,
				Page:  page,
			},
		},
	}

	filted := selector.Filter()
	data := filted.Sort().Paginate()
	return fromCells(data.GenericDataList), nil
}

(4)在 internal/pkg/k8s/job/update.go 中创建以下内容,用于更新 job

go
package job

import (
	"context"
	"encoding/json"
	"github.com/joker-bai/hawkeye/global"
	batchv1 "k8s.io/api/batch/v1"
	metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
)

func UpdateJob(namespace, content string) error {
	var sts batchv1.Job
	if err := json.Unmarshal([]byte(content), &sts); err != nil {
		return err
	}

	if _, err := global.K8S.BatchV1().Jobs(namespace).Update(context.TODO(), &sts, metav1.UpdateOptions{}); err != nil {
		return err
	}

	return nil
}

(5)在 internal/pkg/k8s/job/detail.go 中创建以下内容,用于获取 job 详情

go
package job

import (
	"context"
	"github.com/joker-bai/hawkeye/global"
	batchv1 "k8s.io/api/batch/v1"
	metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
)

func GetJobDetail(name, namespace string) (*batchv1.Job, error) {
	sts, err := global.K8S.BatchV1().Jobs(namespace).Get(context.TODO(), name, metav1.GetOptions{})
	if err != nil {
		return nil, err
	}
	return sts, nil
}

(6)在 internal/pkg/k8s/job/pods.go 中新增以下内容,用户获取 job 的 Pod

go
package job

import (
	"context"
	"github.com/joker-bai/hawkeye/global"
	corev1 "k8s.io/api/core/v1"
	metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
)

func GetPods(name, namespace string) ([]corev1.Pod, error) {
	d, err := global.K8S.BatchV1().Jobs(namespace).Get(context.TODO(), name, metav1.GetOptions{})
	if err != nil {
		return nil, err
	}
	// 先获取labelSelector
	label := d.Spec.Selector.String()

	var pods *corev1.PodList
	// 根据label获取Pod
	pods, err = global.K8S.CoreV1().Pods(namespace).List(context.TODO(), metav1.ListOptions{LabelSelector: label})
	if err != nil {
		return nil, err
	}
	return pods.Items, nil
}

3、实现 services 方法

3.1、请求参数校验

internal/app/requests 目录中新建 k8s_job.go 文件,写入以下内容以完成请求参数校验:

go
package requests

import (
	"github.com/gin-gonic/gin"
	"github.com/joker-bai/hawkeye/pkg/app"
	"github.com/thedevsaddam/govalidator"
)

type K8sJobCreateRequest struct {
	Name                  string                `json:"name" valid:"name"`                                       // Job的名字
	ContainerImage        string                `json:"container_image" valid:"container_image"`                 // 容器镜像名
	ImagePullSecret       *string               `json:"image_pull_secret" valid:"image_pull_secret"`             // 拉取镜像的密钥
	ContainerCommand      *string               `json:"container_command" valid:"container_command"`             // 容器启动时执行的命令
	ContainerCommandArgs  *string               `json:"container_command_args" valid:"container_command_args"`   // 容器启动时执行命令的参数
	PortMappings          []PortMapping         `json:"port_mappings" valid:"port_mappings"`                     // 容器端口
	Variables             []EnvironmentVariable `json:"variables" valid:"variables"`                             // 环境变量
	IsCreateService       bool                  `json:"is_create_service" valid:"is_create_service"`             // 是否创建Service
	Description           *string               `json:"description" valid:"description"`                         // 描述
	Namespace             string                `json:"namespace" valid:"namespace"`                             // 名称空间
	MemoryRequirement     *string               `json:"memory_requirement" valid:"memory_requirement"`           // 内存
	CpuRequirement        *string               `json:"cpu_requirement" valid:"cpu_requirement"`                 // CPU
	Labels                []Label               `json:"labels" valid:"labels"`                                   // 标签
	RunAsPrivileged       bool                  `json:"run_as_privileged" valid:"run_as_privileged"`             // 是否特权用户运行容器
	IsReadinessEnable     bool                  `json:"is_readiness_enable" valid:"is_readiness_enable"`         // 是否开启ReadinessProbe健康检查
	ReadinessProbe        HealthCheckDetail     `json:"readiness_probe" valid:"readiness_probe"`                 // ReadinessProbe配置
	IsLivenessEnable      bool                  `json:"is_liveness_enable" valid:"is_liveness_enable"`           // 是否开启LivenessProbe健康检查
	LivenessProbe         HealthCheckDetail     `json:"liveness_probe" valid:"liveness_probe"`                   // LivenessProbe 主要配置
	ActiveDeadlineSeconds *int64                `json:"active_deadline_seconds" valid:"active_deadline_seconds"` // 超时时间,默认600s
	BackoffLimit          *int32                `json:"backoff_limit" valid:"backoff_limit"`                     // 重试次数,默认6次
	Completions           *int32                `json:"completions" valid:"completions"`                         // 成功运行的Pod数
	Parallelism           *int32                `json:"parallelism" valid:"parallelism"`                         // 并行运行的Pod数
}

func ValidK8sJobCreateRequest(data interface{}, ctx *gin.Context) map[string][]string {
	rules := govalidator.MapData{
		"name":            []string{"required"},
		"namespace":       []string{"required"},
		"container_image": []string{"required"},
		"replicas":        []string{"required"},
	}

	messages := govalidator.MapData{
		"namespace": []string{
			"required: namespace不能为空",
		},
		"name": []string{
			"required: name不能为空",
		},
		"container_image": []string{
			"required: image不能为空",
		},
		"replicas": []string{
			"required: replicas不能为空",
		},
	}

	return app.ValidateOptions(data, rules, messages)
}

type K8sJobUpdateRequest struct {
	Namespace string `json:"namespace" valid:"namespace"`
	Content   string `json:"content" valid:"content"`
}

func ValidK8sJobUpdateRequest(data interface{}, ctx *gin.Context) map[string][]string {
	rules := govalidator.MapData{
		"namespace": []string{"required"},
		"content":   []string{"required"},
	}
	messages := govalidator.MapData{
		"namespace": []string{
			"required: namespace 不能为空",
		},
		"content": []string{
			"required: content 不能为空",
		},
	}

	// 校验入参

	return app.ValidateOptions(data, rules, messages)
}

type K8sJobListRequest struct {
	K8sCommonRequest
	Page  int `json:"page" form:"page" valid:"page"`    // 页数
	Limit int `json:"limit" form:"limit" valid:"limit"` // 每页条数
}

func ValidK8sJobListRequest(data interface{}, ctx *gin.Context) map[string][]string {
	rules := govalidator.MapData{
		"namespace": []string{"required"},
		"page":      []string{"required"},
		"limit":     []string{"required"},
	}
	messages := govalidator.MapData{
		"namespace": []string{
			"required: namespace不能为空",
		},
		"page": []string{
			"required: page不能为空",
		},
		"limit": []string{
			"required: limit不能为空",
		},
	}

	// 校验入参

	return app.ValidateOptions(data, rules, messages)
}

3.2、实现 services 方法

internal/app/services/k8s_job.go 文件中新增 Job 操作的 services 方法,如下:

go
package services

import (
	"github.com/joker-bai/hawkeye/internal/app/requests"
	"github.com/joker-bai/hawkeye/internal/pkg/k8s/job"
	batchv1 "k8s.io/api/batch/v1"
	corev1 "k8s.io/api/core/v1"
)

// Job Service

func (s *Services) K8sJobList(param *requests.K8sJobListRequest) ([]batchv1.Job, error) {
	return job.ListJob(param.Name, param.Namespace, param.Page, param.Limit)
}

func (s *Services) K8sJobDelete(param *requests.K8sCommonRequest) error {
	return job.DeleteJob(param.Name, param.Namespace)
}

func (s *Services) K8sJobUpdate(param *requests.K8sJobUpdateRequest) error {
	return job.UpdateJob(param.Namespace, param.Content)
}

func (s *Services) K8sJobCreate(param *requests.K8sJobCreateRequest) error {
	return job.CreateJob(param)
}

func (s *Services) K8sJobGetPod(param *requests.K8sCommonRequest) ([]corev1.Pod, error) {
	return job.GetPods(param.Name, param.Namespace)
}

func (s *Services) K8sJobDetail(param *requests.K8sCommonRequest) (*batchv1.Job, error) {
	return job.GetJobDetail(param.Name, param.Namespace)
}

4、新增 controllers 方法

在 internal/app/controllers/api/v1/k8s 目录中新增 job.go 文件,实现如下方法:

go
package k8s

import (
	"github.com/gin-gonic/gin"
	"github.com/joker-bai/hawkeye/global"
	"github.com/joker-bai/hawkeye/internal/app/requests"
	"github.com/joker-bai/hawkeye/internal/app/services"
	"github.com/joker-bai/hawkeye/pkg/app"
	"github.com/joker-bai/hawkeye/pkg/errorcode"
	"go.uber.org/zap"
)

type JobController struct{}

// List godoc
// @Summary 列出K8s Job
// @Description 列出K8s Job
// @Tags K8s Job管理
// @Produce json
// @Param name query string false "Job名" maxlength(100)
// @Param namespace query string false "命名空间" maxlength(100)
// @Param page query int true "页码"
// @Param limit query int true "每页数量"
// @Success 200 {object} string "成功"
// @Failure 400 {object} errorcode.Error "请求错误"
// @Failure 500 {object} errorcode.Error "内部错误"
// @Router /api/v1/k8s/job/list [get]
func (k *JobController) List(ctx *gin.Context) {
	param := requests.K8sJobListRequest{}
	response := app.NewResponse(ctx)

	if ok := app.Validate(ctx, &param, requests.ValidK8sJobListRequest); !ok {
		return
	}

	svc := services.New(ctx)
	jobs, err := svc.K8sJobList(&param)
	if err != nil {
		global.Log.Error("获取Job列表失败", zap.String("error", err.Error()))
		response.ToErrorResponse(errorcode.ErrorK8sJobListFail)
		return
	}

	response.ToResponseList(jobs, len(jobs))
}

// Update godoc
// @Summary 更新Job
// @Description 更新Job
// @Tags K8s Job管理
// @Produce json
// @Param body body requests.K8sJobUpdateRequest true "body"
// @Success 200 {object} string "成功"
// @Failure 400 {object} errorcode.Error "请求错误"
// @Failure 500 {object} errorcode.Error "内部错误"
// @Router /api/v1/k8s/job/update [post]
func (k *JobController) Update(ctx *gin.Context) {
	param := requests.K8sJobUpdateRequest{}
	response := app.NewResponse(ctx)

	if ok := app.Validate(ctx, &param, requests.ValidK8sJobUpdateRequest); !ok {
		return
	}

	svc := services.New(ctx)
	err := svc.K8sJobUpdate(&param)
	if err != nil {
		global.Log.Error("更新Job失败", zap.String("error", err.Error()))
		response.ToErrorResponse(errorcode.ErrorK8sJobUpdateFail)
		return
	}

	response.ToResponse(gin.H{
		"msg": "Job更新成功",
	})
}

// Delete godoc
// @Summary 删除Job
// @Description 删除Job
// @Tags K8s Job管理
// @Produce json
// @Param body body requests.K8sCommonRequest true "body"
// @Success 200 {object} string "成功"
// @Failure 400 {object} errorcode.Error "请求错误"
// @Failure 500 {object} errorcode.Error "内部错误"
// @Router /api/v1/k8s/job/delete [post]
func (k *JobController) Delete(ctx *gin.Context) {
	param := requests.K8sCommonRequest{}
	response := app.NewResponse(ctx)

	if ok := app.Validate(ctx, &param, requests.ValidK8sCommonRequest); !ok {
		return
	}

	svc := services.New(ctx)
	err := svc.K8sJobDelete(&param)
	if err != nil {
		global.Log.Error("删除Jobs失败", zap.String("error", err.Error()))
		response.ToErrorResponse(errorcode.ErrorK8sJobDeleteFail)
		return
	}

	response.ToResponse(gin.H{
		"msg": "Job删除成功",
	})
}

// Create godoc
// @Summary 创建Job
// @Description 创建Job
// @Tags K8s Job管理
// @Produce json
// @Param body body requests.K8sJobCreateRequest true "body"
// @Success 200 {object} string "成功"
// @Failure 400 {object} errorcode.Error "请求错误"
// @Failure 500 {object} errorcode.Error "内部错误"
// @Router /api/v1/k8s/job/create [post]
func (k *JobController) Create(ctx *gin.Context) {
	param := requests.K8sJobCreateRequest{}
	response := app.NewResponse(ctx)

	if ok := app.Validate(ctx, &param, requests.ValidK8sJobCreateRequest); !ok {
		return
	}

	svc := services.New(ctx)
	err := svc.K8sJobCreate(&param)
	if err != nil {
		global.Log.Error("创建Job失败", zap.String("error", err.Error()))
		response.ToErrorResponse(errorcode.ErrorK8sJobCreateFail)
		return
	}

	response.ToResponse(gin.H{
		"msg": "Job创建成果",
	})
}

// GetPods godoc
// @Summary 获取Job的Pod
// @Description 获取Job的Pod
// @Tags K8s Job管理
// @Produce json
// @Param name query string false "Job名" maxlength(100)
// @Param namespace query string false "命名空间" maxlength(100)
// @Param container query string false "容器" maxlength(100)
// @Success 200 {object} string "成功"
// @Failure 400 {object} errorcode.Error "请求错误"
// @Failure 500 {object} errorcode.Error "内部错误"
// @Router /api/v1/k8s/job/pods [get]
func (k *JobController) GetPods(ctx *gin.Context) {
	param := requests.K8sCommonRequest{}
	response := app.NewResponse(ctx)

	if ok := app.Validate(ctx, &param, requests.ValidCommonIdRequest); !ok {
		return
	}

	svc := services.New(ctx)
	pods, err := svc.K8sJobGetPod(&param)
	if err != nil {
		global.Log.Error("获取获取Job的Pod失败", zap.String("error", err.Error()))
		response.ToErrorResponse(errorcode.ErrorK8sJobGetPodsFail)
		return
	}

	response.ToResponse(gin.H{
		"data": pods,
		"msg":  "获取Job的Pod成功",
	})
}

// Detail godoc
// @Summary 获取Job的详情
// @Description 获取Job的详情
// @Tags K8s Job管理
// @Produce json
// @Param name query string false "Job名" maxlength(100)
// @Param namespace query string false "命名空间" maxlength(100)
// @Success 200 {object} string "成功"
// @Failure 400 {object} errorcode.Error "请求错误"
// @Failure 500 {object} errorcode.Error "内部错误"
// @Router /api/v1/k8s/job/detail [get]
func (k *JobController) Detail(ctx *gin.Context) {
	param := requests.K8sCommonRequest{}
	response := app.NewResponse(ctx)

	if ok := app.Validate(ctx, &param, requests.ValidK8sCommonRequest); !ok {
		return
	}

	svc := services.New(ctx)
	job, err := svc.K8sJobDetail(&param)
	if err != nil {
		global.Log.Error("获取获取Job的详情失败", zap.String("error", err.Error()))
		response.ToErrorResponse(errorcode.ErrorK8sJobDetailFail)
		return
	}

	response.ToResponse(gin.H{
		"data": job,
		"msg":  "获取Job的详情成功",
	})
}

再到 pkg/errorcode/k8s.go 文件中新增如下错误代码:

go
package errorcode

var (
	......
	// K8s Job 错误码

	ErrorK8sJobUpdateFail  = NewError(500051, "更新K8s Job 失败")
	ErrorK8sJobDeleteFail  = NewError(500052, "删除K8s Job 失败")
	ErrorK8sJobListFail    = NewError(500053, "获取K8s Job 列表失败")
	ErrorK8sJobDetailFail  = NewError(500054, "获取K8s Job 详情失败")
	ErrorK8sJobGetPodsFail = NewError(500056, "获取K8s Job 的Pod失败")
	ErrorK8sJobCreateFail  = NewError(500057, "创建K8s Job 失败")
)

5、新增路由

internal/app/routers/k8s.go 文件中新增 DaemonSet 操作的路由,如下:

go
package routers

import (
	"github.com/gin-gonic/gin"
	v1 "github.com/joker-bai/kubemana/internal/app/controllers/api/v1"
)

type K8sRouter struct{}

func (r *K8sRouter) Inject(router *gin.RouterGroup) {

	k8s := router.Group("/k8s")
	{
		......

		// Job 管理
		jc := new(k8s.JobController)
		ks.GET("/job/list", jc.List)
		ks.POST("/job/create", jc.Create)
		ks.POST("/job/update", jc.Update)
		ks.POST("/job/delete", jc.Delete)
		ks.GET("/job/pods", jc.GetPods)
		ks.GET("/job/detail", jc.Detail)
	}
}

6、测试一下

PS:测试之前都需要先初始化集群,在 4.3.1 Pod 章节有介绍。

这里简单测试列出 Job 接口,如下:

94bd8437cc9d32684d695ea41477af32 MD5

其他接口自行下去测试。

7、代码版本

本节开发完成后,记得生成 swag 和标记代码版本,如下:

go
$ swag init
$ git add .
$ git commit -m "新增k8s集群job操作"
最近更新