目录

paddle-operator分析

operator实际上就是控制器,核心逻辑是Reconcile

paddle operator是在"sigs.k8s.io/controller-runtime"基础上实现的控制器

源码版本: v0.3.0

源码地址:https://github.com/PaddleFlow/paddle-operator

Reconcile

Reconcile实现,将用户在对象中指定的状态与实际群集状态进行比较,然后执行操作,使实际群集状态反映用户指定的状态。

 1
 2
 3
 4
 5
 6
 7
 8
 9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64

// Request contains the information necessary to reconcile a Kubernetes object.  This includes the
// information to uniquely identify the object - its Name and Namespace.  It does NOT contain information about
// any specific Event or the object contents itself.
type Request struct {
    // NamespacedName is the name and namespace of the object to reconcile.
    types.NamespacedName
}

// Result contains the result of a Reconciler invocation.
type Result struct {
    // Requeue tells the Controller to requeue the reconcile key.  Defaults to false.
    Requeue bool

    // RequeueAfter if greater than 0, tells the Controller to requeue the reconcile key after the Duration.
    // Implies that Requeue is true, there is no need to set Requeue to true at the same time as RequeueAfter.
    RequeueAfter time.Duration
}

/*
Reconciler implements a Kubernetes API for a specific Resource by Creating, Updating or Deleting Kubernetes
objects, or by making changes to systems external to the cluster (e.g. cloudproviders, github, etc).

reconcile implementations compare the state specified in an object by a user against the actual cluster state,
and then perform operations to make the actual cluster state reflect the state specified by the user.

Typically, reconcile is triggered by a Controller in response to cluster Events (e.g. Creating, Updating,
Deleting Kubernetes objects) or external Events (GitHub Webhooks, polling external sources, etc).

Example reconcile Logic:

    * Read an object and all the Pods it owns.
    * Observe that the object spec specifies 5 replicas but actual cluster contains only 1 Pod replica.
    * Create 4 Pods and set their OwnerReferences to the object.

reconcile may be implemented as either a type:

    type reconcile struct {}

    func (reconcile) reconcile(controller.Request) (controller.Result, error) {
        // Implement business logic of reading and writing objects here
        return controller.Result{}, nil
    }

Or as a function:

    controller.Func(func(o controller.Request) (controller.Result, error) {
        // Implement business logic of reading and writing objects here
        return controller.Result{}, nil
    })

Reconciliation is level-based, meaning action isn't driven off changes in individual Events, but instead is
driven by actual cluster state read from the apiserver or a local cache.
For example if responding to a Pod Delete Event, the Request won't contain that a Pod was deleted,
instead the reconcile function observes this when reading the cluster state and seeing the Pod as missing.
*/
type Reconciler interface {
    // Reconciler performs a full reconciliation for the object referred to by the Request.
    // The Controller will requeue the Request to be processed again if an error is non-nil or
    // Result.Requeue is true, otherwise upon completion it will remove the work from the queue.
    Reconcile(context.Context, Request) (Result, error)
}


Reconcile实现

Reconcile函数处理逻辑简化如下

 1
 2
 3
 4
 5
 6
 7
 8
 9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35


func (r *PaddleJobReconciler) Reconcile(ctx context.Context, req ctrl.Request) (ctrl.Result, error) {
    // 1 获取PaddleJob资源对象pdj
    r.Get(ctx, req.NamespacedName, &pdj)
    
    // pdj 的finalize处理
  
    // 2  获取该PaddleJob资源pdj对象关联的pod资源列表
    r.List(ctx, &childPods, client.InNamespace(req.Namespace), client.MatchingFields{ctrlRefKey: req.Name})
    
    // 3 根据最新的pods状态,来同步更新pdj的状态
    r.syncCurrentStatus(ctx, &pdj, childPods)

    // 4 删除不需要的pod,如pod的副本数 > pdj.GetSpecs定义的数量 
    r.deleteResource(ctx, &pdj, &childPods.Items[i])
    // 5 如果 pdj.Spec.Intranet == pdv1.Service,为pdj关联的每个pod 创建 svc
    
    // 6 按策略清理删除Failed和Completed状态的pod和svc
    cleanOne()
    
    // 7 根据pdj.GetStatuses和pdj.GetSpecs的比较判断,进行pod创建 
    createPod(res, i)
    // constructPod
    
    // 8 当未设置Spec.Elastic,需等待pdj的所有pod全部运行(应该是ready)起来后,再为该pdj创建configmap
    // Create configmap of global env for all pods after all pods are running
    // 说明: 这里isAllPodsReady准确的含义如下
    // Since the ip or alternative information of pods are collected to the configmap,  the configmap will be created after the pods allocated but the pods will not running until configmap ready.
    // 还要保证所有pod的ip已分配,即pod.Status.PodIP,才能为该pdj创建configmap
    if pdj.Spec.Elastic == nil && isAllPodsReady(&pdj) {
        cm := constructConfigMap(&pdj, childPods)
        r.createResource(ctx, &pdj, cm)
    }
}

syncCurrentStatus

同步更新pdj.Status

 1
 2
 3
 4
 5
 6
 7
 8
 9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
// 根据最新的pods状态,来同步更新pdj的状态
func (r *PaddleJobReconciler) syncCurrentStatus(ctx context.Context, pdj *pdv1.PaddleJob, childPods corev1.PodList) {
    syncStatusByPod := func(ss *pdv1.ResourceStatus, pod *corev1.Pod) {
        if pod.CreationTimestamp.Before(&pdj.CreationTimestamp) {
            return
        }
        // pod status -> pdj status的映射转换
        switch pod.Status.Phase {
        case corev1.PodPending:
            ss.Pending++
        case corev1.PodRunning:
            // 进一步检查pod中的容器状态是否ready和running
            if isPodRealRuning(pod) {
                ss.Running++
            } else {
                // 此时pod中的容器尚未全部起来
                ss.Starting++
            }
        case corev1.PodFailed:
            ss.Failed++
        case corev1.PodSucceeded:
            ss.Succeeded++
        }
        pref, err := ref.GetReference(r.Scheme, pod)
        if err != nil {
            return
        }
        // 更新pdj status的refs
        ss.Refs = append(ss.Refs, *pref)
    }

    pdj.Status = pdv1.PaddleJobStatus{
        Phase:          getPaddleJobPhase(pdj),
        Mode:           getPaddleJobMode(pdj),
        StartTime:      getPaddleJobStartTime(pdj),
        CompletionTime: getPaddleJobCompleteTime(pdj),
    }

    statuses := map[string]*pdv1.ResourceStatus{}
    for i, pod := range childPods.Items {
        resType := pod.Annotations[pdv1.ResourceAnnotation]
        if statuses[resType] == nil {
            statuses[resType] = &pdv1.ResourceStatus{}
        }
        syncStatusByPod(statuses[resType], &childPods.Items[i])
    }
    for resType, status := range statuses {
        pdj.SetStatus(resType, status)
    }
}

func (pdj *PaddleJob) SetStatus(resType string, status *ResourceStatus) {
    switch resType {
    case ResourcePS:
        pdj.Status.PS = status
    case ResourceWorker:
        pdj.Status.Worker = status
    case ResourceHeter:
        pdj.Status.Heter = status
    }

}


help func

pdj的状态判断函数

 1
 2
 3
 4
 5
 6
 7
 8
 9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
func isAllPodsReady(pdj *pdv1.PaddleJob) bool {
    specs := pdj.GetSpecs()
    statuses := pdj.GetStatuses()
    for k, _ := range specs {
        if !isPodReady(specs[k], statuses[k]) {
            return false
        }
    }
    return true
}

func isPodReady(spec *pdv1.ResourceSpec, status *pdv1.ResourceStatus) bool {
    if spec == nil {
        return true
    }
    if status != nil && len(status.Refs) == spec.Replicas {
        return true
    }
    return false
}

func isFailed(status *pdv1.ResourceStatus) bool {
    return status != nil && status.Failed > 0
}
func isPending(status *pdv1.ResourceStatus) bool {
    return status != nil && status.Pending > 0
}
func isStarting(status *pdv1.ResourceStatus) bool {
    return status != nil && status.Starting > 0
}
func isRunning(spec *pdv1.ResourceSpec, status *pdv1.ResourceStatus) bool {
    return spec == nil || (status != nil && spec.Replicas == status.Running)
}
func isCompleted(spec *pdv1.ResourceSpec, status *pdv1.ResourceStatus) bool {
    return spec == nil || (status != nil && spec.Replicas == status.Succeeded)
}

configmap构造

pod构造

operator 会给pod自动注入一些配置字段,如configmap

 1
 2
 3
 4
 5
 6
 7
 8
 9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
func constructPod(pdj *pdv1.PaddleJob, resType string, idx int) (pod *corev1.Pod) {
    
    // ...
    envIP := corev1.EnvVar{
        Name: "POD_IP",
    }
    if pdj.Spec.Intranet == pdv1.Service {
        envIP.Value = name
    } else {
        envIP.ValueFrom = &corev1.EnvVarSource{
            FieldRef: &corev1.ObjectFieldSelector{
                FieldPath: "status.podIP",
            },
        }
    }
    envRank := corev1.EnvVar{
        Name:  "PADDLE_TRAINER_ID",
        Value: fmt.Sprintf("%d", idx),
    }
    envRole := corev1.EnvVar{
        Name:  "TRAINING_ROLE",
        Value: pdv1.TrainingRole[resType],
    }
    envRole2 := corev1.EnvVar{
        Name:  "PADDLE_TRAINING_ROLE",
        Value: pdv1.TrainingRole[resType],
    }
    pod.Spec.Containers[0].Env = append(pod.Spec.Containers[0].Env, envIP, envRank, envRole, envRole2)
    // pod字段 按下面2种场景区别设置
    // 场景1. pdj.Spec.Elastic时,pod无需使用pdj的configmap
    if pdj.Spec.Elastic != nil {
        envJobID := corev1.EnvVar{
            Name:  "PADDLE_ELASTIC_JOB_ID",
            Value: fmt.Sprintf("%s-%s", pdj.Namespace, pdj.Name),
        }
        envNP := corev1.EnvVar{
            Name:  "PADDLE_ELASTIC_NP",
            Value: fmt.Sprintf("%d", pdj.Spec.Worker.Replicas),
        }
        envTimeout := corev1.EnvVar{
            Name:  "PADDLE_ELASTIC_TIMEOUT",
            Value: "60",
        }

        pod.Spec.Containers[0].Env = append(pod.Spec.Containers[0].Env, envJobID, envNP, envTimeout)
    } else {
        // 场景2. 无pdj.Spec.Elastic时,pod的Containers需要使用pdj构造好的configmap
        envF := corev1.EnvFromSource{
            ConfigMapRef: &corev1.ConfigMapEnvSource{
                LocalObjectReference: corev1.LocalObjectReference{
                    Name: pdj.Name,
                },
            },
        }

        pod.Spec.Containers[0].EnvFrom = append(pod.Spec.Containers[0].EnvFrom, envF)
    }

    if pdj.Spec.Intranet == pdv1.Service {
        // 给容器添加端口PADDLE_PORT 2379
        pod.Spec.Containers[0].Ports = append(pod.Spec.Containers[0].Ports, corev1.ContainerPort{ContainerPort: PADDLE_PORT})
    } else if pdj.Spec.Intranet == pdv1.HostNetwork {
        pod.Spec.HostNetwork = true
    }
    // 配置pod RestartPolicy
    if pdj.Spec.Elastic != nil {
        pod.Spec.RestartPolicy = "OnFailure"
    } else if pod.Spec.RestartPolicy == "" {
        if resType == pdv1.ResourceWorker && pdj.Spec.Intranet == pdv1.Service {
            pod.Spec.RestartPolicy = "OnFailure"
        } else {
            pod.Spec.RestartPolicy = "Never"
        }
    }

    return pod
}

状态图

paddle job状态图,如下:

说明:

  • operator目前处理使用的 padddle job状态有:
    • Pending
    • Starting
    • Running
    • Completed
    • Failed
  • Pending为初态
  • Failed和Completed是终态
  • operator对pod 真正Running判断成立条件:pod的status和其所有容器的status都是running,且容器ready
  • Starting是operator的paddle job自定义状态ResourceStatus.Starting:,从k8s Running pod进一步处理得出:
    • 对Running pod进行了细分,表示pod是否RealRunning,即pod中的容器是否ready

其它

  1
  2
  3
  4
  5
  6
  7
  8
  9
 10
 11
 12
 13
 14
 15
 16
 17
 18
 19
 20
 21
 22
 23
 24
 25
 26
 27
 28
 29
 30
 31
 32
 33
 34
 35
 36
 37
 38
 39
 40
 41
 42
 43
 44
 45
 46
 47
 48
 49
 50
 51
 52
 53
 54
 55
 56
 57
 58
 59
 60
 61
 62
 63
 64
 65
 66
 67
 68
 69
 70
 71
 72
 73
 74
 75
 76
 77
 78
 79
 80
 81
 82
 83
 84
 85
 86
 87
 88
 89
 90
 91
 92
 93
 94
 95
 96
 97
 98
 99
100
101
102
103
104
105
106
107
108
// PaddleJob status
type ResourceStatus struct {
    // Pending
    Pending int `json:"pending,omitempty"`
    // Starting
    Starting int `json:"starting,omitempty"`
    // Running
    Running int `json:"running,omitempty"`
    // Failed
    Failed int `json:"failed,omitempty"`
    // Success
    Succeeded int `json:"succeeded,omitempty"`
    // Unknown
    Unknown int `json:"unknown,omitempty"`
    // A list of pointer to pods
    Refs []corev1.ObjectReference `json:"refs,omitempty"`
}


func (pdj *PaddleJob) GetSpecs() map[string]*ResourceSpec {
    return map[string]*ResourceSpec{
        ResourcePS:     pdj.Spec.PS,
        ResourceWorker: pdj.Spec.Worker,
        ResourceHeter:  pdj.Spec.Heter,
    }
}

func (pdj *PaddleJob) GetStatuses() map[string]*ResourceStatus {
    return map[string]*ResourceStatus{
        ResourcePS:     pdj.Status.PS,
        ResourceWorker: pdj.Status.Worker,
        ResourceHeter:  pdj.Status.Heter,
    }
}


//----------------------------- 
//----------------------------- 
// pdj的一些状态设置函数
//----------------------------- 
//----------------------------- 
func getPaddleJobPhase(pdj *pdv1.PaddleJob) pdv1.PaddleJobPhase {

    // final phase won't change any more
    if pdj.Status.Phase == pdv1.Completed {
        return pdv1.Completed
    } else if pdj.Status.Phase == pdv1.Failed {
        return pdv1.Failed
    }

    specs := pdj.GetSpecs()
    statuses := pdj.GetStatuses()
    for _, status := range statuses {
        if isFailed(status) {
            return pdv1.Failed
        } else if isPending(status) {
            return pdv1.Pending
        } else if isStarting(status) {
            return pdv1.Starting
        }
    }
    checkAll := func(check func(spec *pdv1.ResourceSpec, status *pdv1.ResourceStatus) bool) bool {
        for k, _ := range statuses {
            if !check(specs[k], statuses[k]) {
                return false
            }
        }
        return true
    }
    if checkAll(isRunning) {
        return pdv1.Running
    }
    if checkAll(isCompleted) {
        return pdv1.Completed
    }

    if pdj.Status.Phase == "" {
        return pdv1.Pending
    }

    return pdj.Status.Phase
}

func getPaddleJobStartTime(pdj *pdv1.PaddleJob) *metav1.Time {
    if pdj.Status.StartTime.IsZero() && pdj.Status.Phase == pdv1.Running {
        tmp := metav1.Now()
        return &tmp
    }
    return pdj.Status.StartTime
}

func getPaddleJobCompleteTime(pdj *pdv1.PaddleJob) *metav1.Time {
    if pdj.Status.CompletionTime.IsZero() && (pdj.Status.Phase == pdv1.Completed || pdj.Status.Phase == pdv1.Failed) {
        tmp := metav1.Now()
        return &tmp
    }
    return pdj.Status.CompletionTime
}

func getPaddleJobMode(pdj *pdv1.PaddleJob) pdv1.PaddleJobMode {
    if pdj.Spec.PS != nil {
        return pdv1.PaddleJobModePS
    } else if pdj.Spec.Worker != nil && pdj.Spec.Worker.Replicas > 1 {
        return pdv1.PaddleJobModeCollective
    } else {
        return pdv1.PaddleJobModeSingle
    }
}