基于 k8s v1.27.0 进行源码分析其原理.

入口

创建 daemonset controller 控制器对象, 并且在内部使用 informer 注册监听 daemonset, pod, node 资源的变更事件. 注册 eventHandler 没什么可说的,跟其他控制逻辑类型,就是把对象往 queue 里推.

func NewDaemonSetsController(
    ...
) (*DaemonSetsController, error) {
    dsc := &DaemonSetsController{
        ...
    }

    // daemonset informer
    daemonSetInformer.Informer().AddEventHandler(cache.ResourceEventHandlerFuncs{
        AddFunc:    dsc.addDaemonset,
        UpdateFunc: dsc.updateDaemonset,
        DeleteFunc: dsc.deleteDaemonset,
    })

    // pod informer
    podInformer.Informer().AddEventHandler(cache.ResourceEventHandlerFuncs{
        AddFunc:    dsc.addPod,
        UpdateFunc: dsc.updatePod,
        DeleteFunc: dsc.deletePod,
    })

    // node informer
    nodeInformer.Informer().AddEventHandler(cache.ResourceEventHandlerFuncs{
        AddFunc:    dsc.addNode,
        UpdateFunc: dsc.updateNode,
    })

    return dsc, nil
}

Run() 启动时要确保各个 informer 同步完毕,cache.WaitForNamedCacheSync 的实现很简单,就是周期性的判断所有的 informer 是否 synced 同步完成, 一直轮询知道成功同步完毕. 启动多个 runWorker 协程, 接着启动一个 gc 回收协程.

informer 会把事件生成 key, 怼到队列里, runwokrer 会监听 queue, 然后调用 syncHandler 去完成 daemonset 的同步状态. syncHandler 的具体实现函数是 syncDaemonSet 方法.

func (dsc *DaemonSetsController) Run(ctx context.Context, workers int) {
    // 启动时要确保各个 informer 同步完毕, WaitForNamedCacheSync 的实现很简单,就是周期性的判断所有的 informer 是否 synced 同步完成, 一直轮询到成功.
    if !cache.WaitForNamedCacheSync("daemon sets", ctx.Done(), dsc.podStoreSynced, dsc.nodeStoreSynced, dsc.historyStoreSynced, dsc.dsStoreSynced) {
        return
    }

    // 启动多个 runWorker
    for i := 0; i < workers; i++ {
        go wait.UntilWithContext(ctx, dsc.runWorker, time.Second)
    }

    // 启动 gc 回收
    go wait.Until(dsc.failedPodsBackoff.GC, BackoffGCInterval, ctx.Done())

    <-ctx.Done()
}

func (dsc *DaemonSetsController) runWorker(ctx context.Context) {
    // 一直循环调用, controller 只有运行,没有退出,当然也不需要退出.
    for dsc.processNextWorkItem(ctx) {
    }
}

func (dsc *DaemonSetsController) processNextWorkItem(ctx context.Context) bool {
    // 从队列里获取 daemonset key, queue 内部有条件变量,拿不到资源会陷入等待.
    dsKey, quit := dsc.queue.Get()
    if quit {
        return false
    }
    defer dsc.queue.Done(dsKey)

    // 执行同步函数, syncHanlder 是 ds 里关键业务处理入口
    err := dsc.syncHandler(ctx, dsKey.(string))
    if err == nil {
        dsc.queue.Forget(dsKey)
        return true
    }

    dsc.queue.AddRateLimited(dsKey)
    return true
}

同步管理 daemonset 配置

关键函数的调用关系如下:

syncDaemonSet

下面是 syncDaemonSet 的流程:

  1. 从 key 获取 namespace 和 name.
  2. 从 ds infomrer lister 里获取 ds 对象.
  3. 获取所有的 node 集合.
  4. 判断是否满足 expectations 条件, 当不满足预期时, 只更新状态即可, 满足则继续运行.
  5. 同步 daemonset 配置.
  6. 更新 daemonset status 状态.
func (dsc *DaemonSetsController) syncDaemonSet(ctx context.Context, key string) error {
    // 从 key 拆分 namespace 和 ds name, key 的格式为 namespace/name
    namespace, name, err := cache.SplitMetaNamespaceKey(key)
    if err != nil {
        return err
    }

    // 从 informer list 里获取 ds 对象
    ds, err := dsc.dsLister.DaemonSets(namespace).Get(name)
    if apierrors.IsNotFound(err) { // 找不到则直接跳出
        return nil
    }

    // 获取所有 node 列表
    nodeList, err := dsc.nodeLister.List(labels.Everything())
    if err != nil {
        return err
    }

    // 获取 dskey
    dsKey, err := controller.KeyFunc(ds)


    // 通过 ds 获取 current, old 的 constructHistory
    cur, old, err := dsc.constructHistory(ctx, ds)
    if err != nil {
        return fmt.Errorf("failed to construct revisions of DaemonSet: %v", err)
    }
    hash := cur.Labels[apps.DefaultDaemonSetUniqueLabelKey]

    // 是否满足 expectations 条件, 当不满足预期时, 只更新状态即可.
    if !dsc.expectations.SatisfiedExpectations(dsKey) {
        return dsc.updateDaemonSetStatus(ctx, ds, nodeList, hash, false)
    }

    // 更新 daemonset, 关键函数
    err = dsc.updateDaemonSet(ctx, ds, nodeList, hash, dsKey, old)

    // 更新 daemonset 的 status 状态
    statusErr := dsc.updateDaemonSetStatus(ctx, ds, nodeList, hash, true)
    ...
    return nil
}

updateDaemonSet

分析代码得知 updateDaemonSet() 的流程如下:

  1. updateDaemonSet() 内部通过 manage 来同步 daemonset 的状态.
  2. 当满足 expectations 且更新策略为 RollingUpdate 滚动更新, 则调用 rollingUpdate 进行滚动更新.
  3. 最后调用 cleanupHistory 来进行清理过期的 ControllerRevision.
func (dsc *DaemonSetsController) updateDaemonSet(ctx context.Context, ds *apps.DaemonSet, nodeList []*v1.Node, hash, key string, old []*apps.ControllerRevision) error {
    // 处理 daemonset
    err := dsc.manage(ctx, ds, nodeList, hash)
    if err != nil {
        return err
    }

    if dsc.expectations.SatisfiedExpectations(key) {
        switch ds.Spec.UpdateStrategy.Type {
        case apps.OnDeleteDaemonSetStrategyType:
        case apps.RollingUpdateDaemonSetStrategyType:
            // 类型为 rollingUpdate, 则进行滚动更新
            err = dsc.rollingUpdate(ctx, ds, nodeList, hash)
        }
        if err != nil {
            return err
        }
    }

    // 清理不需要的 cleanupHistory
    err = dsc.cleanupHistory(ctx, ds, old)
    if err != nil {
        return fmt.Errorf("failed to clean up revisions of DaemonSet: %w", err)
    }

    return nil
}

manage

分析代码得知 manage() 的流程如下:

  1. 通过 getNodesToDaemonPods() 获取 daemonset 和 node 关系, 返回的结构为 map[node][]*v1.Pod.
  2. 遍历 node 集合, 通过 podsShouldBeOnNode 获取 node 需要创建和删除的 ds pods 集合.
  3. getUnscheduledPodsWithoutNode 获取不能调度的 Pods 集合, 就是遍历 nodeToDaemonPods, 把不在 nodeList 集合里的 node 相关的 pods 追加到待删除集合里.
  4. 调用 syncNodes 来在一些 node 上创建或者删除一些 daemonset pod.
func (dsc *DaemonSetsController) manage(ctx context.Context, ds *apps.DaemonSet, nodeList []*v1.Node, hash string) error {
    // 获取 node 和 ds 的对应关系
    nodeToDaemonPods, err := dsc.getNodesToDaemonPods(ctx, ds)
    if err != nil {
        return fmt.Errorf("couldn't get node to daemon pod mapping for daemon set %q: %v", ds.Name, err)
    }

    var nodesNeedingDaemonPods, podsToDelete []string
    // 遍历 node 集合
    for _, node := range nodeList {
        // 获取该 node 需要创建或者删除的 pods 集合.
        nodesNeedingDaemonPodsOnNode, podsToDeleteOnNode := dsc.podsShouldBeOnNode(
            node, nodeToDaemonPods, ds, hash)

        // 添加到待新增集合里
        nodesNeedingDaemonPods = append(nodesNeedingDaemonPods, nodesNeedingDaemonPodsOnNode...)

        // 添加到待删除集合里
        podsToDelete = append(podsToDelete, podsToDeleteOnNode...)
    }

    // 获取不能调度的 Pods 集合, 把不在 nodeList 集合里的 node 相关的 pods 追加到待删除集合里.
    podsToDelete = append(podsToDelete, getUnscheduledPodsWithoutNode(nodeList, nodeToDaemonPods)...)

    // 同步操作, 这里会执行添加和删除操作
    if err = dsc.syncNodes(ctx, ds, podsToDelete, nodesNeedingDaemonPods, hash); err != nil {
        return err
    }

    return nil
}

syncNodes

syncNodes 方法主要是为需要 daemonset 的 node 创建 pod 以及删除多余的 pod, 源码流程如下:

  1. 计算本次 createDiff 和 deleteDiff 增减的个数, 每次 sync 最多执行 250 个, 多余的需要等待下次调用解决.
  2. 提前先把 createDiff 和 deleteDiff 写入到 expectations 中.
  3. 指数级增量批量创建 daemonset pod, 执行完毕后需要依次减少 expectations creation.
  4. 直接遍历并发删除 daemonset pod, 执行完毕后需要减少 expectations deletion.
const (
    // BurstReplicas is a rate limiter for booting pods on a lot of pods.
    // 最多单词执行 250 个
    BurstReplicas = 250
)

func (dsc *DaemonSetsController) syncNodes(ctx context.Context, ds *apps.DaemonSet, podsToDelete, nodesNeedingDaemonPods []string, hash string) error {
    // 获取 dsKey
    dsKey, err := controller.KeyFunc(ds)
    if err != nil {
        return fmt.Errorf("couldn't get key for object %#v: %v", ds, err)
    }

    // 需要创建的 ds pod 的个数
    createDiff := len(nodesNeedingDaemonPods)

    // 需要删除删减 ds pod 的个数
    deleteDiff := len(podsToDelete)

    // 一次新增不能超过 250
    if createDiff > dsc.burstReplicas {
        createDiff = dsc.burstReplicas
    }

    // 一次删减不能超过 250
    if deleteDiff > dsc.burstReplicas {
        deleteDiff = dsc.burstReplicas
    }

    // 写入预期值, 声明该 ds key 需要 add 添加 和 del 删除的个数
    dsc.expectations.SetExpectations(dsKey, createDiff, deleteDiff)

    // 收集错误
    errCh := make(chan error, createDiff+deleteDiff)

    generation, err := util.GetTemplateGeneration(ds)
    if err != nil {
        generation = nil
    }

    // 获取 pod 模板
    template := util.CreatePodTemplate(ds.Spec.Template, generation, hash)

    // 求最小, 默认为 1 个
    batchSize := integer.IntMin(createDiff, controller.SlowStartInitialBatchSize)

    // 按照指数级增长进行创建 pod, 依次是 1, 2, 4, 8 ...
    for pos := 0; createDiff > pos; batchSize, pos = integer.IntMin(2*batchSize, createDiff-(pos+batchSize)), pos+batchSize {
        errorCount := len(errCh)
        createWait.Add(batchSize)
        for i := pos; i < pos+batchSize; i++ {
            go func(ix int) {
                // 在 pod templaet 模板里的配置节点的affinity, 也就是指定 node 节点.
                podTemplate := template.DeepCopy()
                podTemplate.Spec.Affinity = util.ReplaceDaemonSetPodNodeNameNodeAffinity(
                    podTemplate.Spec.Affinity, nodesNeedingDaemonPods[ix])

                // 创建 pods
                err := dsc.podControl.CreatePods(ctx, ds.Namespace, podTemplate,
                    ds, metav1.NewControllerRef(ds, controllerKind))

                if err != nil {
                    // 在 expectations 预期里减一 creation.
                    dsc.expectations.CreationObserved(dsKey)

                    // 传递错误
                    errCh <- err
                }
            }(i)
        }
        createWait.Wait()

        skippedPods := createDiff - (batchSize + pos)
        // 创建完了则退出
        if errorCount < len(errCh) && skippedPods > 0 {
            break
        }
    }

    deleteWait := sync.WaitGroup{}
    deleteWait.Add(deleteDiff)
    for i := 0; i < deleteDiff; i++ {
        go func(ix int) {
            // 删除 daemonset pod, 指定 ns 和 podid
            if err := dsc.podControl.DeletePod(ctx, ds.Namespace, podsToDelete[ix], ds); err != nil {
                // 在 expectations 预期里减去一个 deletion 值.
                dsc.expectations.DeletionObserved(dsKey)
            }
        }(i)
    }
    deleteWait.Wait()

    errors := []error{}
    for err := range errCh {
        errors = append(errors, err)
    }
    return utilerrors.NewAggregate(errors)
}
创建 pods (CreatePods)

在 pod 里注入引用 OwnerReferences 关系, 然后调用 kubeclient 来创建 pod.

func (r RealPodControl) CreatePods(ctx context.Context, namespace string, template *v1.PodTemplateSpec, controllerObject runtime.Object, controllerRef *metav1.OwnerReference) error {
    return r.CreatePodsWithGenerateName(ctx, namespace, template, controllerObject, controllerRef, "")
}

func (r RealPodControl) CreatePodsWithGenerateName(ctx context.Context, namespace string, template *v1.PodTemplateSpec, controllerObject runtime.Object, controllerRef *metav1.OwnerReference, generateName string) error {
    // 注册引用关系
    pod, err := GetPodFromTemplate(template, controllerObject, controllerRef)
    if err != nil {
        return err
    }
    if len(generateName) > 0 {
        pod.ObjectMeta.GenerateName = generateName
    }
    return r.createPods(ctx, namespace, pod, controllerObject)
}

func (r RealPodControl) createPods(ctx context.Context, namespace string, pod *v1.Pod, object runtime.Object) error {
    // 调用 kubeclient 进行 pod 创建
    newPod, err := r.KubeClient.CoreV1().Pods(namespace).Create(ctx, pod, metav1.CreateOptions{})
    if err != nil {
        return err
    }
    accessor, err := meta.Accessor(object)
    ...

    return nil
}
删除 pod (deletePod)

直接调用 kubeclient 来删除 pod, 按照 namespace 和 podID 来删除.

func (r RealPodControl) DeletePod(ctx context.Context, namespace string, podID string, object runtime.Object) error {
    ...

    if err := r.KubeClient.CoreV1().Pods(namespace).Delete(ctx, podID, metav1.DeleteOptions{}); err != nil {
        return fmt.Errorf("unable to delete pods: %v", err)
    }

    ....

    return nil
}

rollingUpdate 滚动更新

rollingUpdate 的过程相对繁琐一些, 获取 daemonset 的 node 映射关系, 计算 maxSurge, maxUnavailable 值, 获取新旧 pod 对象, 根据条件选择放到新增 nodes 集合还是待删除的 pods 集合. 最后选择使用 syncNodes 来同步配置,也就是增删 daemonset pods.

func (dsc *DaemonSetsController) rollingUpdate(ctx context.Context, ds *apps.DaemonSet, nodeList []*v1.Node, hash string) error {
    // 获取 daemonset 的 node 对应关系
    nodeToDaemonPods, err := dsc.getNodesToDaemonPods(ctx, ds)
    if err != nil {
        return fmt.Errorf("couldn't get node to daemon pod mapping for daemon set %q: %v", ds.Name, err)
    }

    // 计算 maxSurge, maxUnavailable 值
    maxSurge, maxUnavailable, err := dsc.updatedDesiredNodeCounts(ds, nodeList, nodeToDaemonPods)
    if err != nil {
        return fmt.Errorf("couldn't get unavailable numbers: %v", err)
    }

    ...

    for nodeName, pods := range nodeToDaemonPods {
        // 获取新旧 pod 对象
        newPod, oldPod, ok := findUpdatedPodsOnNode(ds, pods, hash)
        ...
        switch {
        case newPod == nil:
            switch {
            case !podutil.IsPodAvailable(oldPod, ds.Spec.MinReadySeconds, metav1.Time{Time: now}):
                if allowedNewNodes == nil {
                    allowedNewNodes = make([]string, 0, len(nodeToDaemonPods))
                }
                // 待添加 nodes 集合
                allowedNewNodes = append(allowedNewNodes, nodeName)
            default:
                if candidateNewNodes == nil {
                    candidateNewNodes = make([]string, 0, maxSurge)
                }
                // 待添加 nodes 集合
                candidateNewNodes = append(candidateNewNodes, nodeName)
            }
        default:
            // 待删除 pods 集合
            oldPodsToDelete = append(oldPodsToDelete, oldPod.Name)
        }
    }

    ...
    newNodesToCreate := append(allowedNewNodes, candidateNewNodes[:remainingSurge]...)

    // 调用 syncNodes 来创建新 pod 和 清理旧容器.
    return dsc.syncNodes(ctx, ds, oldPodsToDelete, newNodesToCreate, hash)
}

预期 expectations 的设计

expectations 作用

expectations 本质就是个状态结构, 维护一个资源的 add (新增个数) 和 del (删除个数) 两个值, 这个资源在这里是 daemonset, 也可以是 deployment, replicaset, job 等对象.

expectations 会把资源进行格式化为 namespace/name 的字符串, 该字符串作为 expectations 的 key.

那么如何使用 expectations 预期值 ?

在同步 daemonset 配置前, 先判断是否配置了预期值, 通常当不满足预期时, 会进行下一步处理, 否则直接跳出.
操作同步之前, 需要先在 expectations 调用里声明下需要新增和删除的个数. 然后当每次创建完 pod 后, 需要对相应的 add 进行减一. 同理, 删除操作完成后需要对 del 字段进行减一.

type ControllerExpectations struct {
    cache.Store
}

func (r *ControllerExpectations) GetExpectations(controllerKey string) (*ControlleeExpectations, bool, error) {
    exp, exists, err := r.GetByKey(controllerKey)
    if err == nil && exists {
        return exp.(*ControlleeExpectations), true, nil
    }
    return nil, false, err
}

func (r *ControllerExpectations) SatisfiedExpectations(controllerKey string) bool {
    if exp, exists, err := r.GetExpectations(controllerKey); exists {
        if exp.Fulfilled() {
            return true
        } else if exp.isExpired() {
            return true
        } else {
            return false
        }
    }
    ...
}

func (r *ControllerExpectations) CreationObserved(controllerKey string) {
    r.LowerExpectations(controllerKey, 1, 0)
}

func (r *ControllerExpectations) DeletionObserved(controllerKey string) {
    r.LowerExpectations(controllerKey, 0, 1)
}

func (r *ControllerExpectations) LowerExpectations(controllerKey string, add, del int) {
    if exp, exists, err := r.GetExpectations(controllerKey); err == nil && exists {
        exp.Add(int64(-add), int64(-del))
    }
}

ControlleeExpectations 结构体维护了 add 和 del 数字, ds controller 调用 add() 方法对预期值进行递减. 当 add 和 del 等于小于 0 时, 则可以认定完事了.

type ControlleeExpectations struct {
    add       int64
    del       int64
    key       string
    timestamp time.Time
}

func (e *ControlleeExpectations) Add(add, del int64) {
    atomic.AddInt64(&e.add, add)
    atomic.AddInt64(&e.del, del)
}

func (e *ControlleeExpectations) Fulfilled() bool {
    return atomic.LoadInt64(&e.add) <= 0 && atomic.LoadInt64(&e.del) <= 0
}

func (e *ControlleeExpectations) GetExpectations() (int64, int64) {
    return atomic.LoadInt64(&e.add), atomic.LoadInt64(&e.del)
}

func (exp *ControlleeExpectations) isExpired() bool {
    return clock.RealClock{}.Since(exp.timestamp) > ExpectationsTimeout
}

更新 daemonset 状态

func (dsc *DaemonSetsController) updateDaemonSetStatus(ctx context.Context, ds *apps.DaemonSet, nodeList []*v1.Node, hash string, updateObservedGen bool) error {
    // 获取 daemonset pod 跟 node 映射关系
    nodeToDaemonPods, err := dsc.getNodesToDaemonPods(ctx, ds)
    if err != nil {
        return fmt.Errorf("couldn't get node to daemon pod mapping for daemon set %q: %v", ds.Name, err)
    }

    var desiredNumberScheduled, currentNumberScheduled, numberMisscheduled, numberReady, updatedNumberScheduled, numberAvailable int

    // 遍历 node 集合
    for _, node := range nodeList {
        shouldRun, _ := NodeShouldRunDaemonPod(node, ds)
        scheduled := len(nodeToDaemonPods[node.Name]) > 0

        if shouldRun {
            desiredNumberScheduled++
            if !scheduled {
                continue
            }

            currentNumberScheduled++
            daemonPods, _ := nodeToDaemonPods[node.Name]
            sort.Sort(podByCreationTimestampAndPhase(daemonPods))
            pod := daemonPods[0]
            if podutil.IsPodReady(pod) {
                numberReady++
                if podutil.IsPodAvailable(pod, ds.Spec.MinReadySeconds, metav1.Time{Time: now}) {
                    numberAvailable++
                }
            }

            if util.IsPodUpdated(pod, hash, generation) {
                updatedNumberScheduled++
            }
        } else {
            if scheduled {
                numberMisscheduled++
            }
        }
    }
    numberUnavailable := desiredNumberScheduled - numberAvailable

    // 更新 daemonset 状态
    err = storeDaemonSetStatus(ctx, dsc.kubeClient.AppsV1().DaemonSets(ds.Namespace), ds, desiredNumberScheduled, currentNumberScheduled, numberMisscheduled, numberReady, updatedNumberScheduled, numberAvailable, numberUnavailable, updateObservedGen)
    if err != nil {
        return fmt.Errorf("error storing status for daemon set %#v: %w", ds, err)
    }

    // 当就绪计数不相等时, 继续延迟入队
    if ds.Spec.MinReadySeconds > 0 && numberReady != numberAvailable {
        dsc.enqueueDaemonSetAfter(ds, time.Duration(ds.Spec.MinReadySeconds)*time.Second)
    }
    return nil
}

大家觉得文章对你有些作用! 如果想赏钱,可以用微信扫描下面的二维码,感谢!
另外再次标注博客原地址  xiaorui.cc