Black-Hole's Blog

In love

基于任务量进行k8s集群的灵活调度处理

Posted at — Aug 20, 2019

前言

最近公司内有个需求是为了进一步控制某个项目的k8s集群的资源,避免资源浪费。

目前项目需要的资源占用率很高,需要3核CPU、2G内存。在一开始的时候是没有做灵活调度处理的。会让Pod一直处于运行状态,即使没有任务的时候也会一直运行,虽然说可以通过k8sResourcesRequestsLimits减少一点资源,但是还是会照成一定资源的浪费。

介绍

在正文开始前,需要把流程介绍一下,方便后文的理解。

首先别的部门会往数据库里插入一条数据,然后在由调度器去定期的扫数据库,扫到一个新数据,则由调度器去调用k8s的api去创建一个Job资源,在Job里有一个Pod,由Pod去做一些任务。然后结束。

看起来比较简单,但是有几个需要注意的地方:

  1. 由于Pod是需要环境变量的,而Pod是由调度器去创建的。那么这个时候就需要把变量一步步传进去
  2. 调度器不能去更改任何的数据,只能从数据库里拿,这是为了更好的解耦。不能让调度器去关心任何的业务逻辑及数据
  3. 调度器的本身不能存有任何的状态,因为一旦涉及到状态,就要去有个地方去存储它。因为要考虑到调度器本身重启。这样做只会带来更大的负担。
  4. 需要考虑当前的集群是否有资源再启动Pod

开始

调度器使用了GoLang进行开发,所以后文都将使用Go做为主力语言。

创建一个可调试的k8s环境

目前因为使用的是Go进行开发,所以使用了k8s官方的client-go这个库。而这个库本身就提供了一些创建clientset的方法(可以把clientset理解成一个可以和当前集群master通信的管道)

package main

import (
  "fmt"

  "k8s.io/client-go/kubernetes"
  "k8s.io/client-go/rest"
  "k8s.io/client-go/tools/clientcmd"
)

func main() {
  // 这个方法里包含了k8s自身对Cluster的配置操作
  kubeConfig, err := rest.InClusterConfig()

  if err != nil {
    // 如果是开发环境,则使用当前minikube。需要配置KUBECONFIG变量
    // 如果是minikube,KUBECONFIG变量可以指向$HOME/.kube/config
    kubeConfig, err = clientcmd.NewNonInteractiveDeferredLoadingClientConfig(
      clientcmd.NewDefaultClientConfigLoadingRules(),
      &clientcmd.ConfigOverrides{}).ClientConfig()

    // 如果没有配置KUBECONFIG变量,且当前也没有在集群里运行
    if err != nil {
      panic("get k8s config fail: " + err.Error())
    }
  }

  // 创建clientset失败
  clientset, err := kubernetes.NewForConfig(kubeConfig)
  if err != nil {
    panic("failed to create k8s clientset: " + err.Error())
  }

  // 创建成功
  fmt.Println(clientset)
}

其中rest.InClusterConfig()代码也十分简单,就是去当前机器下的/var/run/secrets/kubernetes.io/serviceaccount/读取tokenca。以及读取KUBERNETES_SERVICE_HOSTKUBERNETES_SERVICE_PORT环境变量,再把他们拼在一起,感兴趣的同学可以去看下源码

根据上文可以知道rest.InClusterConfig()是针对以及身在集群中的机器而言的。在本地开发环境是肯定不行的。所以我们需要另一个方法去解决这个问题。

可以看到上面我已经做了处理,当发现InClusterConfig失败后,会转而执行下面的代码:

kubeConfig, err = clientcmd.NewNonInteractiveDeferredLoadingClientConfig(
  clientcmd.NewDefaultClientConfigLoadingRules(),
  &clientcmd.ConfigOverrides{}).ClientConfig()

这段代码其实也比较简单,就是去读取当前环境下的KUBECONFIG获取本地k8s的配置路径。如果没有这个变量,再去获取当前用户目录下的.kube/config文件。最终根据文件改造成所需要的配置。主要源码可见: NewDefaultClientConfigLoadingRulesClientConfig

现在只要保证你本机有minikube环境就可以正常调试、开发了。

以上的方法参考rook的写法

创建Job及Pod

数据库查询的这里就不再阐述了,可以根据自身的业务进行适配、开发。这里只是起到一个抛砖引玉的效果。不止是数据库,其他任何东西都可以,主要还是要看自身的业务适合什么。

我们先假设,这里从数据库里拿到了一条数据,我们需要把数据库的值传给Pod。避免Pod里再做一次查询。现在我们需要先把Job定义好:

import (
  batchv1 "k8s.io/api/batch/v1"
  apiv1 "k8s.io/api/core/v1"
  "k8s.io/apimachinery/pkg/api/resource"
  metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
)

// job所需配置
type JobsSpec struct {
  Namespace string
  Image     string
  Prefix    string
}

// 返回指定的cpu、memory资源值
// 写法参考k8s见:https://github.com/kubernetes/kubernetes/blob/b3875556b0edf3b5eaea32c69678edcf4117d316/pkg/kubelet/cm/helpers_linux_test.go#L36-L53
func getResourceList(cpu, memory string) apiv1.ResourceList {
  res := apiv1.ResourceList{}
  if cpu != "" {
    res[apiv1.ResourceCPU] = resource.MustParse(cpu)
  }
  if memory != "" {
    res[apiv1.ResourceMemory] = resource.MustParse(memory)
  }
  return res
}

// 返回ResourceRequirements对象,详细见getResourceList函数注释
func getResourceRequirements(requests, limits apiv1.ResourceList) apiv1.ResourceRequirements {
  res := apiv1.ResourceRequirements{}
  res.Requests = requests
  res.Limits = limits
  return res
}

// 转为指针
func newInt64(i int64) *int64 {
  return &i
}

// 创建job的配置
// 返回指定的cpu、memory资源值
// 写法参考k8s见:https://github.com/kubernetes/kubernetes/blob/b3875556b0edf3b5eaea32c69678edcf4117d316/pkg/kubelet/cm/helpers_linux_test.go#L36-L53
func getResourceList(cpu, memory string) apiv1.ResourceList {
  res := apiv1.ResourceList{}
  if cpu != "" {
    res[apiv1.ResourceCPU] = resource.MustParse(cpu)
  }
  if memory != "" {
    res[apiv1.ResourceMemory] = resource.MustParse(memory)
  }
  return res
}

// 返回ResourceRequirements对象,详细见getResourceList函数注释
func getResourceRequirements(requests, limits apiv1.ResourceList) apiv1.ResourceRequirements {
  res := apiv1.ResourceRequirements{}
  res.Requests = requests
  res.Limits = limits
  return res
}

// job所需配置
type jobsSpec struct {
  Namespace string
  Image     string
  Prefix    string
}

// 创建job的配置
func (j *jobsSpec) Create(envMap map[string]string) *batchv1.Job {
  u2 := uuid.NewV4().String()[:8]
  name := fmt.Sprint(j.Prefix, "-", u2)
  
  return &batchv1.Job{
    ObjectMeta: metav1.ObjectMeta{
      Name:      name,
      Namespace: j.Namespace,
    },
    Spec: batchv1.JobSpec{
      Template: apiv1.PodTemplateSpec{
        Spec: apiv1.PodSpec{
          RestartPolicy: "Never",
          Containers: []apiv1.Container{
            {
              Name:            name,
              Image:           j.Image,
              Env:             EnvToVars(envMap),
              ImagePullPolicy: "Always",
              Resources:       getResourceRequirements(getResourceList("2500m", "2048Mi"), getResourceList("3000m", "2048Mi")),
            },
          },
        },
      },
    },
  }
}

这里没什么好说的,基本就是资源定义,以及上门还有注释。

上面的代码其实少了一部分,这部分是把变量注入进去的。也就是EnvToVars,核心代码如下:

// 把对象转化成k8s所能接受的环境变量格式
func EnvToVars(envMap map[string]string) []v1.EnvVar {
  var envVars []v1.EnvVar
  for k, v := range envMap {
    envVar := v1.EnvVar{
      Name:  k,
      Value: v,
    }
    envVars = append(envVars, envVar)
  }
  return envVars
}

// 获取当前系统中所有的变量,并转成map方式
func GetAllEnvToMap() map[string]string {
  item := make(map[string]string)
  for _, k := range os.Environ() {
    splits := strings.Split(k, "=")
    item[splits[0]] = splits[1]
  }

  return item
}

// 合并两个map,为了更好的性能,使用闭包的方式,这样sourceMap只需要调用一次即可
func MergeMap(sourceMap map[string]string) func(insertMap map[string]string) map[string]string {
  return func(insertMap map[string]string) map[string]string {
    for k, v := range insertMap {
      sourceMap[k] = v
    }

    return sourceMap
  }
}

然后在使用的时候就是这样了:

job := jobsSpec{
  Prefix:    "project-" + "dev" + "-job",
  Image:     "docker image name",
  Namespace: "default",
}

willMergeMap := MergeMap(GetAllEnvToMap())

// dbData是从数据库里拿到的数据,格式大致如下
// [ { id: 1, url: 'xxx' }, { id: 2, url: 'yyy' } ]
for _, data := range dbData {
  currentEnvMap := willMergeMap(data)

  // 创建Job
  _, err = api.CreateJob(currentEnvMap)
  
  if err != nil {
    panic("create job fail", err.Error())
  }
}

这样一来,就实现了把当前环境变量及数据通过变量的方式传给Pod。这样的话,只需要保证当前的调度器里存在一些Pod可能会用到的变量就行了,如:S3 TokenDB Host等。通过这种方式,Pod基本上什么都不用关系,它所需要的变量,会由调度器传给它,分工明确。

优化

其实以上其实就已经完成了最核心的东西,本身也不是特别的难。很简单的逻辑。只不过光有这些是不够的,还有很多地方需要考虑。

资源判断

这里在说之前有个前提,之前说过这个调度器是不能去更改任何数据的,更改数据只能由Pod里的容器去更改。

那么这个时候就有问题了。

集群如果资源不够分配的话,那Pod将会一直处于Pending状态,根据上文,变量已经注入到Pod里了,而且由于里面的容器没有启动。那就会导致数据没有更改,而没有更改的数据,调度器就会一直认为他的新的。导致会为这条数据再启动一个Job,一直循环到当集群资源足够后其中的一个Pod去更改了数据。

举个例子,假设数据库里有一个status的字段,当值为wating时,调度器就认为这是一条新数据,会把这个数据转变成环境变量注入到Pod里,再由Pod去把waiting更改成process。调度器每3分钟去扫一次数据,所以Pod必须在3分钟内把数据更改完毕。

而这时由于资源不够,k8s创建了这个Pod,但是里面的代码没有运行,导致没有去更改数据,就会导致调度器一直去为同一条数据创建Pod。

解决方案也比较简单,只要去判断下Pod的状态是否为Pending,如果是,则不再创建Pod。下面是核心代码:

func HavePendingPod() (bool, error) {
  // 获取当前namespace下所有的pod
  pods, err := clientset.CoreV1().Pods(Namespace).List(metaV1.ListOptions{})
  if err != nil {
    return false, err
  }

  // 循环pod,判断每个pod是否符合当前的前缀,如果符合,则说明当前的环境已经存在Pending状态了
  for _, v := range pods.Items {
    phase := v.Status.Phase
    if phase == "Pending" {
      if strings.HasPrefix(v.Name, Prefix) {
        return true, nil
      }
    }
  }

  return false, nil
}

当为true时,就不再创建Job

Job数量最大值

集群的资源也不是无限的,虽然我们对Pending情况做了处理,但是这只是一种防御手段。我们还是要对数量进行一个管控,当Job数量等于某个值时,不在创建Job了。代码也很简单,我这里就把获取当前环境下Job数量的代码放出来:

// 获取当前namespace下同环境的job Item实例
func GetJobListByNS() ([]v1.Job, error) {
  var jobList, err = clientset.BatchV1().Jobs(Namespace).List(metaV1.ListOptions{})
    if err != nil {
    return nil, err
  }

  // 过滤非同前缀的Job
  var item []v1.Job
  for _, v := range jobList.Items {
    if strings.HasPrefix(v.Name, Prefix) {
      item = append(item, v)
    }
  }

  return item, nil
}

func GetJobLenByNS() (int, error) {
  jobItem, err := api.GetJobListByNS()
  if err != nil {
      return 最大值, err
  }
  
  return len(jobItem), nil
}

删除已完成和失败的

上面的代码其实是有问题的,k8s的Job资源类型是有一个特性是,当完成或者失败的时候,并不会删除自身,也就说即使他完成了,它的数据还会一直停留在那。所以上面的代码会把一些已经完成或者失败的Job也统计进去。到最后会出现一直无法创建Job的窘迫。

解决方案有两个,第一个是在声明Job资源时,添加spec.ttlSecondsAfterFinished属性来做到k8s自动回收完成、失败的Job。可惜的是这是高版本才有的属性,我司很遗憾是低版本的。那就只能用第二种方法了,就是在每次获取数量前,调用api把完成、失败的Job删除:

func DeleteCompleteJob() error {
  jobItem, err := GetJobListByNS()
  if err != nil {
    return err
  }

  // 如果不指定此属性,删除job时,不会删除pod
  propagationPolicy := metaV1.DeletePropagationForeground
  for _, v := range jobItem {
    // 只删除已经结束的job
    if v.Status.Failed == 1 || v.Status.Succeeded == 1 {
      err := clientset.BatchV1().Jobs(Namespace).Delete(v.Name, &metaV1.DeleteOptions{
        PropagationPolicy: &propagationPolicy,
      })

      if err != nil {
        return err
      }
    }
  }

  return nil
}

结尾

整个调度器的代码比较简单,没有必要专门抽成一个库来做。只要知道大概的思路,就可以根据自己的项目需求做出适合自己项目组的调度器。

感谢我司大佬@qqshfox提供的思路