目录

kubelet volume manager源码分析

kubelet volume manager组件源码分析

  • k8s版本:1.20.0

总体

volume模块图

kubelet调用VolumeManager,为pods准备存储设备,存储设备就绪会挂载存储设备到pod所在的节点上,并在容器启动的时候挂载在容器指定的目录中;同时,删除卸载不再使用的存储; kubernetes采用Volume Plugins来实现存储卷的挂载等操作

volume manager

源码位置:kubernetes\pkg\kubelet\volumemanager

  1
  2
  3
  4
  5
  6
  7
  8
  9
 10
 11
 12
 13
 14
 15
 16
 17
 18
 19
 20
 21
 22
 23
 24
 25
 26
 27
 28
 29
 30
 31
 32
 33
 34
 35
 36
 37
 38
 39
 40
 41
 42
 43
 44
 45
 46
 47
 48
 49
 50
 51
 52
 53
 54
 55
 56
 57
 58
 59
 60
 61
 62
 63
 64
 65
 66
 67
 68
 69
 70
 71
 72
 73
 74
 75
 76
 77
 78
 79
 80
 81
 82
 83
 84
 85
 86
 87
 88
 89
 90
 91
 92
 93
 94
 95
 96
 97
 98
 99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
const (
    // reconcilerLoopSleepPeriod is the amount of time the reconciler loop waits
    // between successive executions
    reconcilerLoopSleepPeriod = 100 * time.Millisecond

    // desiredStateOfWorldPopulatorLoopSleepPeriod is the amount of time the
    // DesiredStateOfWorldPopulator loop waits between successive executions
    desiredStateOfWorldPopulatorLoopSleepPeriod = 100 * time.Millisecond

    // desiredStateOfWorldPopulatorGetPodStatusRetryDuration is the amount of
    // time the DesiredStateOfWorldPopulator loop waits between successive pod
    // cleanup calls (to prevent calling containerruntime.GetPodStatus too
    // frequently).
    desiredStateOfWorldPopulatorGetPodStatusRetryDuration = 2 * time.Second

    // podAttachAndMountTimeout is the maximum amount of time the
    // WaitForAttachAndMount call will wait for all volumes in the specified pod
    // to be attached and mounted. Even though cloud operations can take several
    // minutes to complete, we set the timeout to 2 minutes because kubelet
    // will retry in the next sync iteration. This frees the associated
    // goroutine of the pod to process newer updates if needed (e.g., a delete
    // request to the pod).
    // Value is slightly offset from 2 minutes to make timeouts due to this
    // constant recognizable.
    podAttachAndMountTimeout = 2*time.Minute + 3*time.Second

    // podAttachAndMountRetryInterval is the amount of time the GetVolumesForPod
    // call waits before retrying
    podAttachAndMountRetryInterval = 300 * time.Millisecond

    // waitForAttachTimeout is the maximum amount of time a
    // operationexecutor.Mount call will wait for a volume to be attached.
    // Set to 10 minutes because we've seen attach operations take several
    // minutes to complete for some volume plugins in some cases. While this
    // operation is waiting it only blocks other operations on the same device,
    // other devices are not affected.
    waitForAttachTimeout = 10 * time.Minute
)

// VolumeManager runs a set of asynchronous loops that figure out which volumes
// need to be attached/mounted/unmounted/detached based on the pods scheduled on
// this node and makes it so.
type VolumeManager interface {
    // Starts the volume manager and all the asynchronous loops that it controls
    Run(sourcesReady config.SourcesReady, stopCh <-chan struct{})

    // WaitForAttachAndMount processes the volumes referenced in the specified
    // pod and blocks until they are all attached and mounted (reflected in
    // actual state of the world).
    // An error is returned if all volumes are not attached and mounted within
    // the duration defined in podAttachAndMountTimeout.
    WaitForAttachAndMount(pod *v1.Pod) error

    // GetMountedVolumesForPod returns a VolumeMap containing the volumes
    // referenced by the specified pod that are successfully attached and
    // mounted. The key in the map is the OuterVolumeSpecName (i.e.
    // pod.Spec.Volumes[x].Name). It returns an empty VolumeMap if pod has no
    // volumes.
    GetMountedVolumesForPod(podName types.UniquePodName) container.VolumeMap

    // GetExtraSupplementalGroupsForPod returns a list of the extra
    // supplemental groups for the Pod. These extra supplemental groups come
    // from annotations on persistent volumes that the pod depends on.
    GetExtraSupplementalGroupsForPod(pod *v1.Pod) []int64

    // GetVolumesInUse returns a list of all volumes that implement the volume.Attacher
    // interface and are currently in use according to the actual and desired
    // state of the world caches. A volume is considered "in use" as soon as it
    // is added to the desired state of world, indicating it *should* be
    // attached to this node and remains "in use" until it is removed from both
    // the desired state of the world and the actual state of the world, or it
    // has been unmounted (as indicated in actual state of world).
    GetVolumesInUse() []v1.UniqueVolumeName

    // ReconcilerStatesHasBeenSynced returns true only after the actual states in reconciler
    // has been synced at least once after kubelet starts so that it is safe to update mounted
    // volume list retrieved from actual state.
    ReconcilerStatesHasBeenSynced() bool

    // VolumeIsAttached returns true if the given volume is attached to this
    // node.
    VolumeIsAttached(volumeName v1.UniqueVolumeName) bool

    // Marks the specified volume as having successfully been reported as "in
    // use" in the nodes's volume status.
    MarkVolumesAsReportedInUse(volumesReportedAsInUse []v1.UniqueVolumeName)
}



// volumeManager implements the VolumeManager interface
type volumeManager struct {
    // kubeClient is the kube API client used by DesiredStateOfWorldPopulator to
    // communicate with the API server to fetch PV and PVC objects
    kubeClient clientset.Interface

    // volumePluginMgr is the volume plugin manager used to access volume
    // plugins. It must be pre-initialized.
    volumePluginMgr *volume.VolumePluginMgr

    // desiredStateOfWorld is a data structure containing the desired state of
    // the world according to the volume manager: i.e. what volumes should be
    // attached and which pods are referencing the volumes).
    // The data structure is populated by the desired state of the world
    // populator using the kubelet pod manager.
    desiredStateOfWorld cache.DesiredStateOfWorld

    // actualStateOfWorld is a data structure containing the actual state of
    // the world according to the manager: i.e. which volumes are attached to
    // this node and what pods the volumes are mounted to.
    // The data structure is populated upon successful completion of attach,
    // detach, mount, and unmount actions triggered by the reconciler.
    actualStateOfWorld cache.ActualStateOfWorld

    // operationExecutor is used to start asynchronous attach, detach, mount,
    // and unmount operations.
    operationExecutor operationexecutor.OperationExecutor

    // reconciler runs an asynchronous periodic loop to reconcile the
    // desiredStateOfWorld with the actualStateOfWorld by triggering attach,
    // detach, mount, and unmount operations using the operationExecutor.
    reconciler reconciler.Reconciler

    // desiredStateOfWorldPopulator runs an asynchronous periodic loop to
    // populate the desiredStateOfWorld using the kubelet PodManager.
    desiredStateOfWorldPopulator populator.DesiredStateOfWorldPopulator

    // csiMigratedPluginManager keeps track of CSI migration status of plugins
    csiMigratedPluginManager csimigration.PluginManager

    // intreeToCSITranslator translates in-tree volume specs to CSI
    intreeToCSITranslator csimigration.InTreeToCSITranslator
}

kubelet会调用VolumeManager,为pods准备存储设备,存储设备就绪会挂载存储设备到pod所在的节点上,并在容器启动的时候挂载在容器指定的目录中;同时,删除卸载不在使用的存储

VolumeManager接口说明

  • 运行在kubelet 里让存储Ready的部件,主要是mount/unmount(attach/detach可选)
  • pod调度到这个node上后才会有卷的相应操作,所以它的触发端是kubelet(严格讲是kubelet里的pod manager),根据Pod Manager里pod spec里申明的存储来触发卷的挂载操作
  • Kubelet会监听到调度到该节点上的pod声明,会把pod缓存到Pod Manager中,VolumeManager通过Pod Manager获取PV/PVC的状态,并进行分析出具体的attach/detach、mount/umount, 操作然后调用plugin进行相应的业务处理

volumeManager结构体

volumeManager结构体实现了VolumeManager接口,主要有两个需要注意:

  • desiredStateOfWorld:预期状态,volume需要被attach,哪些pods引用这个volume
  • actualStateOfWorld:实际状态,volume已经被atttach哪个node,哪个pod mount volume

desiredStateOfWorld 和 actualStateOfWorld

  • desiredStateOfWorld为理想的volume情况,它主要是根据podManger获取所有的Pod信息,从中提取Volume信息。

  • actualStateOfWorld则是实际的volume情况。

  • desiredStateOfWorldPopulator通过podManager去构建desiredStateOfWorld。

  • reconciler的工作主要是比较actualStateOfWorld和desiredStateOfWorld的差别,然后进行volume的创建、删除和修改,最后使二者达到一致。

流程

新建

NewVolumeManager中主要构造了几个volume控制器

  • volumePluginMgr 和 csiMigratedPluginManager
  • desiredStateOfWorldPopulator
  • reconciler
  1
  2
  3
  4
  5
  6
  7
  8
  9
 10
 11
 12
 13
 14
 15
 16
 17
 18
 19
 20
 21
 22
 23
 24
 25
 26
 27
 28
 29
 30
 31
 32
 33
 34
 35
 36
 37
 38
 39
 40
 41
 42
 43
 44
 45
 46
 47
 48
 49
 50
 51
 52
 53
 54
 55
 56
 57
 58
 59
 60
 61
 62
 63
 64
 65
 66
 67
 68
 69
 70
 71
 72
 73
 74
 75
 76
 77
 78
 79
 80
 81
 82
 83
 84
 85
 86
 87
 88
 89
 90
 91
 92
 93
 94
 95
 96
 97
 98
 99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
// NewMainKubelet instantiates a new Kubelet object along with all the required internal modules.
// No initialization of Kubelet and its modules should happen here.
func NewMainKubelet(){

    // ......

    // setup volumeManager
    klet.volumeManager = volumemanager.NewVolumeManager(
        kubeCfg.EnableControllerAttachDetach,
        nodeName,
        klet.podManager,
        klet.statusManager,
        klet.kubeClient,
        klet.volumePluginMgr,
        klet.containerRuntime,
        kubeDeps.Mounter,
        kubeDeps.HostUtil,
        klet.getPodsDir(),
        kubeDeps.Recorder,
        experimentalCheckNodeCapabilitiesBeforeMount,
        keepTerminatedPodVolumes,
        volumepathhandler.NewBlockVolumePathHandler())

    // ......
}


// NewVolumeManager returns a new concrete instance implementing the
// VolumeManager interface.
//
// kubeClient - kubeClient is the kube API client used by DesiredStateOfWorldPopulator
//   to communicate with the API server to fetch PV and PVC objects
// volumePluginMgr - the volume plugin manager used to access volume plugins.
//   Must be pre-initialized.
func NewVolumeManager(){

    vm := &volumeManager{
        kubeClient:          kubeClient,
        volumePluginMgr:     volumePluginMgr,
        desiredStateOfWorld: cache.NewDesiredStateOfWorld(volumePluginMgr),
        actualStateOfWorld:  cache.NewActualStateOfWorld(nodeName, volumePluginMgr),
        operationExecutor: operationexecutor.NewOperationExecutor(operationexecutor.NewOperationGenerator(
            kubeClient,
            volumePluginMgr,
            recorder,
            checkNodeCapabilitiesBeforeMount,
            blockVolumePathHandler)),
    }

    intreeToCSITranslator := csitrans.New()
    csiMigratedPluginManager := csimigration.NewPluginManager(intreeToCSITranslator)

    vm.intreeToCSITranslator = intreeToCSITranslator
    vm.csiMigratedPluginManager = csiMigratedPluginManager
    vm.desiredStateOfWorldPopulator = populator.NewDesiredStateOfWorldPopulator(
        kubeClient,
        desiredStateOfWorldPopulatorLoopSleepPeriod,
        desiredStateOfWorldPopulatorGetPodStatusRetryDuration,
        podManager,
        podStatusProvider,
        vm.desiredStateOfWorld,
        vm.actualStateOfWorld,
        kubeContainerRuntime,
        keepTerminatedPodVolumes,
        csiMigratedPluginManager,
        intreeToCSITranslator,
        volumePluginMgr)
    vm.reconciler = reconciler.NewReconciler(
        kubeClient,
        controllerAttachDetachEnabled,
        reconcilerLoopSleepPeriod,
        waitForAttachTimeout,
        nodeName,
        vm.desiredStateOfWorld,
        vm.actualStateOfWorld,
        vm.desiredStateOfWorldPopulator.HasAddedPods,
        vm.operationExecutor,
        mounter,
        hostutil,
        volumePluginMgr,
        kubeletPodsDir)

    return vm
}



// NewDesiredStateOfWorldPopulator returns a new instance of
// DesiredStateOfWorldPopulator.
//
// kubeClient - used to fetch PV and PVC objects from the API server
// loopSleepDuration - the amount of time the populator loop sleeps between
//     successive executions
// podManager - the kubelet podManager that is the source of truth for the pods
//     that exist on this host
// desiredStateOfWorld - the cache to populate
func NewDesiredStateOfWorldPopulator(
    kubeClient clientset.Interface,
    loopSleepDuration time.Duration,
    getPodStatusRetryDuration time.Duration,
    podManager pod.Manager,
    podStatusProvider status.PodStatusProvider,
    desiredStateOfWorld cache.DesiredStateOfWorld,
    actualStateOfWorld cache.ActualStateOfWorld,
    kubeContainerRuntime kubecontainer.Runtime,
    keepTerminatedPodVolumes bool,
    csiMigratedPluginManager csimigration.PluginManager,
    intreeToCSITranslator csimigration.InTreeToCSITranslator,
    volumePluginMgr *volume.VolumePluginMgr) DesiredStateOfWorldPopulator {
    return &desiredStateOfWorldPopulator{
        kubeClient:                kubeClient,
        loopSleepDuration:         loopSleepDuration,
        getPodStatusRetryDuration: getPodStatusRetryDuration,
        podManager:                podManager,
        podStatusProvider:         podStatusProvider,
        desiredStateOfWorld:       desiredStateOfWorld,
        actualStateOfWorld:        actualStateOfWorld,
        pods: processedPods{
            processedPods: make(map[volumetypes.UniquePodName]bool)},
        kubeContainerRuntime:     kubeContainerRuntime,
        keepTerminatedPodVolumes: keepTerminatedPodVolumes,
        hasAddedPods:             false,
        hasAddedPodsLock:         sync.RWMutex{},
        csiMigratedPluginManager: csiMigratedPluginManager,
        intreeToCSITranslator:    intreeToCSITranslator,
        volumePluginMgr:          volumePluginMgr,
    }
}

type desiredStateOfWorldPopulator struct {
    kubeClient                clientset.Interface
    loopSleepDuration         time.Duration
    getPodStatusRetryDuration time.Duration
    podManager                pod.Manager
    podStatusProvider         status.PodStatusProvider
    desiredStateOfWorld       cache.DesiredStateOfWorld
    actualStateOfWorld        cache.ActualStateOfWorld
    pods                      processedPods
    kubeContainerRuntime      kubecontainer.Runtime
    timeOfLastGetPodStatus    time.Time
    keepTerminatedPodVolumes  bool
    hasAddedPods              bool
    hasAddedPodsLock          sync.RWMutex
    csiMigratedPluginManager  csimigration.PluginManager
    intreeToCSITranslator     csimigration.InTreeToCSITranslator
    volumePluginMgr           *volume.VolumePluginMgr
}



// NewReconciler returns a new instance of Reconciler.
//
// controllerAttachDetachEnabled - if true, indicates that the attach/detach
//   controller is responsible for managing the attach/detach operations for
//   this node, and therefore the volume manager should not
// loopSleepDuration - the amount of time the reconciler loop sleeps between
//   successive executions
// waitForAttachTimeout - the amount of time the Mount function will wait for
//   the volume to be attached
// nodeName - the Name for this node, used by Attach and Detach methods
// desiredStateOfWorld - cache containing the desired state of the world
// actualStateOfWorld - cache containing the actual state of the world
// populatorHasAddedPods - checker for whether the populator has finished
//   adding pods to the desiredStateOfWorld cache at least once after sources
//   are all ready (before sources are ready, pods are probably missing)
// operationExecutor - used to trigger attach/detach/mount/unmount operations
//   safely (prevents more than one operation from being triggered on the same
//   volume)
// mounter - mounter passed in from kubelet, passed down unmount path
// hostutil - hostutil passed in from kubelet
// volumePluginMgr - volume plugin manager passed from kubelet
func NewReconciler(
    kubeClient clientset.Interface,
    controllerAttachDetachEnabled bool,
    loopSleepDuration time.Duration,
    waitForAttachTimeout time.Duration,
    nodeName types.NodeName,
    desiredStateOfWorld cache.DesiredStateOfWorld,
    actualStateOfWorld cache.ActualStateOfWorld,
    populatorHasAddedPods func() bool,
    operationExecutor operationexecutor.OperationExecutor,
    mounter mount.Interface,
    hostutil hostutil.HostUtils,
    volumePluginMgr *volumepkg.VolumePluginMgr,
    kubeletPodsDir string) Reconciler {
    return &reconciler{
        kubeClient:                    kubeClient,
        controllerAttachDetachEnabled: controllerAttachDetachEnabled,
        loopSleepDuration:             loopSleepDuration,
        waitForAttachTimeout:          waitForAttachTimeout,
        nodeName:                      nodeName,
        desiredStateOfWorld:           desiredStateOfWorld,
        actualStateOfWorld:            actualStateOfWorld,
        populatorHasAddedPods:         populatorHasAddedPods,
        operationExecutor:             operationExecutor,
        mounter:                       mounter,
        hostutil:                      hostutil,
        volumePluginMgr:               volumePluginMgr,
        kubeletPodsDir:                kubeletPodsDir,
        timeOfLastSync:                time.Time{},
    }
}

type reconciler struct {
    kubeClient                    clientset.Interface
    controllerAttachDetachEnabled bool
    loopSleepDuration             time.Duration
    waitForAttachTimeout          time.Duration
    nodeName                      types.NodeName
    desiredStateOfWorld           cache.DesiredStateOfWorld
    actualStateOfWorld            cache.ActualStateOfWorld
    populatorHasAddedPods         func() bool
    operationExecutor             operationexecutor.OperationExecutor
    mounter                       mount.Interface
    hostutil                      hostutil.HostUtils
    volumePluginMgr               *volumepkg.VolumePluginMgr
    kubeletPodsDir                string
    timeOfLastSync                time.Time
}

启动

kl.volumeManager.Run

 1
 2
 3
 4
 5
 6
 7
 8
 9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
// Run starts the kubelet reacting to config updates
func (kl *Kubelet) Run(updates <-chan kubetypes.PodUpdate) {

    // Start volume manager
    go kl.volumeManager.Run(kl.sourcesReady, wait.NeverStop)

    go wait.Until(kl.updateRuntimeUp, 5*time.Second, wait.NeverStop)

    // Set up iptables util rules
    if kl.makeIPTablesUtilChains {
        kl.initNetworkUtil()
    }

    // Start a goroutine responsible for killing pods (that are not properly
    // handled by pod workers).
    go wait.Until(kl.podKiller.PerformPodKillingWork, 1*time.Second, wait.NeverStop)

    // Start component sync loops.
    kl.statusManager.Start()
    kl.probeManager.Start()

    // Start syncing RuntimeClasses if enabled.
    if kl.runtimeClassManager != nil {
        kl.runtimeClassManager.Start(wait.NeverStop)
    }

    // Start the pod lifecycle event generator.
    kl.pleg.Start()
    kl.syncLoop(updates, kl)
}


func (vm *volumeManager) Run(sourcesReady config.SourcesReady, stopCh <-chan struct{}) {
    defer runtime.HandleCrash()

    if vm.kubeClient != nil {
        // start informer for CSIDriver
        go vm.volumePluginMgr.Run(stopCh)
    }

    go vm.desiredStateOfWorldPopulator.Run(sourcesReady, stopCh)
    klog.V(2).Infof("The desired_state_of_world populator starts")

    klog.Infof("Starting Kubelet Volume Manager")
    go vm.reconciler.Run(stopCh)

    metrics.Register(vm.actualStateOfWorld, vm.desiredStateOfWorld, vm.volumePluginMgr)

    <-stopCh
    klog.Infof("Shutting down Kubelet Volume Manager")
}

启动子模块有

  • 如果有volumePlugin(默认安装时没有插件),启动volumePluginMgr
  • 启动 desiredStateOfWorldPopulator:从apiserver同步到的pod信息,更新DesiredStateOfWorld
    • findAndAddNewPods()
    • findAndRemoveDeletedPods() 每隔dswp.getPodStatusRetryDuration时长,进行findAndRemoveDeletedPods()
  • 启动 reconciler:预期状态和实际状态的协调者,负责调整实际状态至预期状态

desiredStateOfWorldPopulator

通过populatorLoop()来更新DesiredStateOfWorld

 1
 2
 3
 4
 5
 6
 7
 8
 9
10
11
12
13
14
15
16
17
18
19
20

func (dswp *desiredStateOfWorldPopulator) populatorLoop() {
    dswp.findAndAddNewPods()

    // findAndRemoveDeletedPods() calls out to the container runtime to
    // determine if the containers for a given pod are terminated. This is
    // an expensive operation, therefore we limit the rate that
    // findAndRemoveDeletedPods() is called independently of the main
    // populator loop.
    if time.Since(dswp.timeOfLastGetPodStatus) < dswp.getPodStatusRetryDuration {
        klog.V(5).Infof(
            "Skipping findAndRemoveDeletedPods(). Not permitted until %v (getPodStatusRetryDuration %v).",
            dswp.timeOfLastGetPodStatus.Add(dswp.getPodStatusRetryDuration),
            dswp.getPodStatusRetryDuration)

        return
    }

    dswp.findAndRemoveDeletedPods()
}
findAndAddNewPods
  • 遍历pod manager中所有pod
  • 过滤掉Terminated态的pod,进行processPodVolumes,把这些pod添加到desired state of world

就是通过podManager获取所有的pods,然后调用processPodVolumes去更新desiredStateOfWorld。但是这样只能更新新增加的Pods的Volume信息。

 1
 2
 3
 4
 5
 6
 7
 8
 9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27

// Iterate through all pods and add to desired state of world if they don't
// exist but should
func (dswp *desiredStateOfWorldPopulator) findAndAddNewPods() {
    // Map unique pod name to outer volume name to MountedVolume.
    mountedVolumesForPod := make(map[volumetypes.UniquePodName]map[string]cache.MountedVolume)
    if utilfeature.DefaultFeatureGate.Enabled(features.ExpandInUsePersistentVolumes) {
        for _, mountedVolume := range dswp.actualStateOfWorld.GetMountedVolumes() {
            mountedVolumes, exist := mountedVolumesForPod[mountedVolume.PodName]
            if !exist {
                mountedVolumes = make(map[string]cache.MountedVolume)
                mountedVolumesForPod[mountedVolume.PodName] = mountedVolumes
            }
            mountedVolumes[mountedVolume.OuterVolumeSpecName] = mountedVolume
        }
    }

    processedVolumesForFSResize := sets.NewString()
    for _, pod := range dswp.podManager.GetPods() {
        if dswp.isPodTerminated(pod) {
            // Do not (re)add volumes for terminated pods
            continue
        }
        dswp.processPodVolumes(pod, mountedVolumesForPod, processedVolumesForFSResize)
    }
}

processPodVolumes

更新desiredStateOfWorld

 1
 2
 3
 4
 5
 6
 7
 8
 9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94

// processPodVolumes processes the volumes in the given pod and adds them to the
// desired state of the world.
func (dswp *desiredStateOfWorldPopulator) processPodVolumes(
    pod *v1.Pod,
    mountedVolumesForPod map[volumetypes.UniquePodName]map[string]cache.MountedVolume,
    processedVolumesForFSResize sets.String) {
    if pod == nil {
        return
    }

    uniquePodName := util.GetUniquePodName(pod)
    // 如果先前在processedPods map中,表示无需处理,提前返回
    if dswp.podPreviouslyProcessed(uniquePodName) {
        return
    }

    allVolumesAdded := true
    // 获取 全部 容器的mount信息container.VolumeMounts
    // 对pod下所有container的volumeDevices与volumeMounts加入map中
    mounts, devices := util.GetPodVolumeNames(pod)

    expandInUsePV := utilfeature.DefaultFeatureGate.Enabled(features.ExpandInUsePersistentVolumes)
    // Process volume spec for each volume defined in pod
    for _, podVolume := range pod.Spec.Volumes {
        if !mounts.Has(podVolume.Name) && !devices.Has(podVolume.Name) {
            // Volume is not used in the pod, ignore it.
            // pod中定义了pod.Spec.Volumes[x].name,但是容器没有挂载使用,则忽略
            klog.V(4).Infof("Skipping unused volume %q for pod %q", podVolume.Name, format.Pod(pod))
            continue
        }
        // createVolumeSpec创建并返回一个可变的volume.Spec的对象。如果需要,它可通过PVC的间接引用以获得PV对象。当无法获取卷时返回报错
        pvc, volumeSpec, volumeGidValue, err :=
            dswp.createVolumeSpec(podVolume, pod, mounts, devices)
        if err != nil {
            klog.Errorf(
                "Error processing volume %q for pod %q: %v",
                podVolume.Name,
                format.Pod(pod),
                err)
            dswp.desiredStateOfWorld.AddErrorToPod(uniquePodName, err.Error())
            allVolumesAdded = false
            continue
        }

        // Add volume to desired state of world
        // 调用FindPluginBySpec函数根据volume.spec找到volume plugin
        //  isAttachableVolume函数,检查插件是否需要attach,不是所有的插件都需要实现AttachableVolumePlugin接口
        // 记录volume与pod之间的关系
        // 对pod name标记为已处理,actual_state_of_world标记重新挂载
        _, err = dswp.desiredStateOfWorld.AddPodToVolume(
            uniquePodName, pod, volumeSpec, podVolume.Name, volumeGidValue)
        if err != nil {
            klog.Errorf(
                "Failed to add volume %s (specName: %s) for pod %q to desiredStateOfWorld: %v",
                podVolume.Name,
                volumeSpec.Name(),
                uniquePodName,
                err)
            dswp.desiredStateOfWorld.AddErrorToPod(uniquePodName, err.Error())
            allVolumesAdded = false
        } else {
            klog.V(4).Infof(
                "Added volume %q (volSpec=%q) for pod %q to desired state.",
                podVolume.Name,
                volumeSpec.Name(),
                uniquePodName)
        }
        // 是否有卷容量调整操作, 实际上是比较 pvc.Status.Capacity 和 pvc.Spec.Capacity
        // pvc.Spec.Capacity > pvc.Status.Capacity时,进行扩容处理
        if expandInUsePV {
            dswp.checkVolumeFSResize(pod, podVolume, pvc, volumeSpec,
                uniquePodName, mountedVolumesForPod, processedVolumesForFSResize)
        }
    }

    // some of the volume additions may have failed, should not mark this pod as fully processed
    if allVolumesAdded {
        dswp.markPodProcessed(uniquePodName)
        // New pod has been synced. Re-mount all volumes that need it
        // (e.g. DownwardAPI)
        dswp.actualStateOfWorld.MarkRemountRequired(uniquePodName)
        // Remove any stored errors for the pod, everything went well in this processPodVolumes
        dswp.desiredStateOfWorld.PopPodErrors(uniquePodName)
    } else if dswp.podHasBeenSeenOnce(uniquePodName) {
        // For the Pod which has been processed at least once, even though some volumes
        // may not have been reprocessed successfully this round, we still mark it as processed to avoid
        // processing it at a very high frequency. The pod will be reprocessed when volume manager calls
        // ReprocessPod() which is triggered by SyncPod.
        dswp.markPodProcessed(uniquePodName)
    }

}

findAndRemoveDeletedPods
  • 由于findAndRemoveDeletedPods 代价比较高昂,因此会检查执行的间隔时间。
  • 遍历desiredStateOfWorld.GetVolumesToMount()的挂载volumes,根据volumeToMount.Pod判断该Volume所属的Pod是否存在于podManager。
    • 如果存在podExists,则继续判断pod是否终止:如果pod为终止则忽略
    • 根据containerRuntime进一步判断pod中的全部容器是否终止:如果该pod仍有容器未终止,则忽略
    • 根据actualStateOfWorld.PodExistsInVolume判断:Actual state没有该pod的挂载volume,但pod manager仍有该pod,则忽略
    • 删除管理器中该pod的该挂载卷:desiredStateOfWorld.DeletePodFromVolume(volumeToMount.PodName, volumeToMount.VolumeName)
    • 删除管理器中该pod信息(desiredStateOfWorldPopulator.pods[volumeToMount.PodName]):deleteProcessedPod(volumeToMount.PodName)

简单说,对于pod manager已经不存在的pods,findAndRemoveDeletedPods会删除更新desiredStateOfWorld中这些pod和其volume记录

 1
 2
 3
 4
 5
 6
 7
 8
 9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
// Iterate through all pods in desired state of world, and remove if they no
// longer exist
func (dswp *desiredStateOfWorldPopulator) findAndRemoveDeletedPods() {
    var runningPods []*kubecontainer.Pod

    runningPodsFetched := false
    for _, volumeToMount := range dswp.desiredStateOfWorld.GetVolumesToMount() {
        pod, podExists := dswp.podManager.GetPodByUID(volumeToMount.Pod.UID)
        if podExists {

            // check if the attachability has changed for this volume
            if volumeToMount.PluginIsAttachable {
                attachableVolumePlugin, err := dswp.volumePluginMgr.FindAttachablePluginBySpec(volumeToMount.VolumeSpec)
                // only this means the plugin is truly non-attachable
                if err == nil && attachableVolumePlugin == nil {
                    // It is not possible right now for a CSI plugin to be both attachable and non-deviceMountable
                    // So the uniqueVolumeName should remain the same after the attachability change
                    dswp.desiredStateOfWorld.MarkVolumeAttachability(volumeToMount.VolumeName, false)
                    klog.Infof("Volume %v changes from attachable to non-attachable.", volumeToMount.VolumeName)
                    continue
                }
            }

            // Skip running pods
            if !dswp.isPodTerminated(pod) {
                continue
            }
            if dswp.keepTerminatedPodVolumes {
                continue
            }
        }

        // Once a pod has been deleted from kubelet pod manager, do not delete
        // it immediately from volume manager. Instead, check the kubelet
        // containerRuntime to verify that all containers in the pod have been
        // terminated.
        if !runningPodsFetched {
            var getPodsErr error
            runningPods, getPodsErr = dswp.kubeContainerRuntime.GetPods(false)
            if getPodsErr != nil {
                klog.Errorf(
                    "kubeContainerRuntime.findAndRemoveDeletedPods returned error %v.",
                    getPodsErr)
                continue
            }

            runningPodsFetched = true
            dswp.timeOfLastGetPodStatus = time.Now()
        }

        runningContainers := false
        for _, runningPod := range runningPods {
            if runningPod.ID == volumeToMount.Pod.UID {
                if len(runningPod.Containers) > 0 {
                    runningContainers = true
                }

                break
            }
        }

        if runningContainers {
            klog.V(4).Infof(
                "Pod %q still has one or more containers in the non-exited state. Therefore, it will not be removed from desired state.",
                format.Pod(volumeToMount.Pod))
            continue
        }
        exists, _, _ := dswp.actualStateOfWorld.PodExistsInVolume(volumeToMount.PodName, volumeToMount.VolumeName)
        if !exists && podExists {
            klog.V(4).Infof(
                volumeToMount.GenerateMsgDetailed(fmt.Sprintf("Actual state has not yet has this volume mounted information and pod (%q) still exists in pod manager, skip removing volume from desired state",
                    format.Pod(volumeToMount.Pod)), ""))
            continue
        }
        klog.V(4).Infof(volumeToMount.GenerateMsgDetailed("Removing volume from desired state", ""))

        dswp.desiredStateOfWorld.DeletePodFromVolume(
            volumeToMount.PodName, volumeToMount.VolumeName)
        dswp.deleteProcessedPod(volumeToMount.PodName)
    }

    podsWithError := dswp.desiredStateOfWorld.GetPodsWithErrors()
    for _, podName := range podsWithError {
        if _, podExists := dswp.podManager.GetPodByUID(types.UID(podName)); !podExists {
            dswp.desiredStateOfWorld.PopPodErrors(podName)
        }
    }
}

说明:

  • 假如runningPodsFetched不存在,并不会立即马上删除卷信息记录。而是调用dswp.kubeContainerRuntime.GetPods(false)抓取Pod信息,这里是调用kubeContainerRuntime的GetPods函数。因此获取的都是runningPods信息,即正在运行的Pod信息。由于一个volume可以属于多个Pod,而一个Pod可以包含多个container,每个container都可以使用volume,所以他要扫描该volume所属的Pod的container信息,确保没有container使用该volume,才会删除该volume。

  • desiredStateOfWorld就构建出来了,这是理想的volume状态,这里并没有发生实际的volume的创建删除挂载卸载操作。实际的操作由reconciler.Run(sourcesReady, stopCh)完成。

reconciler

reconciler 调谐器,即按desiredStateOfWorld来同步volume配置操作

主要流程
  • 通过定时任务定期同步,reconcile就是一致性函数,保存desired和actual状态一致。

  • reconcile首先从actualStateOfWorld获取已经挂载的volume信息,然后查看该volume是否存在于desiredStateOfWorld,假如不存在就卸载。

  • 接着从desiredStateOfWorld获取需要挂载的volumes。与actualStateOfWorld比较,假如没有挂载,则进行挂载。

  • 这样存储就可以加载到主机attach,并挂载到容器目录mount。

  1
  2
  3
  4
  5
  6
  7
  8
  9
 10
 11
 12
 13
 14
 15
 16
 17
 18
 19
 20
 21
 22
 23
 24
 25
 26
 27
 28
 29
 30
 31
 32
 33
 34
 35
 36
 37
 38
 39
 40
 41
 42
 43
 44
 45
 46
 47
 48
 49
 50
 51
 52
 53
 54
 55
 56
 57
 58
 59
 60
 61
 62
 63
 64
 65
 66
 67
 68
 69
 70
 71
 72
 73
 74
 75
 76
 77
 78
 79
 80
 81
 82
 83
 84
 85
 86
 87
 88
 89
 90
 91
 92
 93
 94
 95
 96
 97
 98
 99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209

func (rc *reconciler) Run(stopCh <-chan struct{}) {
    wait.Until(rc.reconciliationLoopFunc(), rc.loopSleepDuration, stopCh)
}

func (rc *reconciler) reconciliationLoopFunc() func() {
    return func() {
        rc.reconcile()

        // Sync the state with the reality once after all existing pods are added to the desired state from all sources.
        // Otherwise, the reconstruct process may clean up pods' volumes that are still in use because
        // desired state of world does not contain a complete list of pods.
        if rc.populatorHasAddedPods() && !rc.StatesHasBeenSynced() {
            klog.Infof("Reconciler: start to sync state")
            rc.sync()
        }
    }
}

func (rc *reconciler) reconcile() {
    // Unmounts are triggered before mounts so that a volume that was
    // referenced by a pod that was deleted and is now referenced by another
    // pod is unmounted from the first pod before being mounted to the new
    // pod.
    rc.unmountVolumes()

    // Next we mount required volumes. This function could also trigger
    // attach if kubelet is responsible for attaching volumes.
    // If underlying PVC was resized while in-use then this function also handles volume
    // resizing.
    rc.mountAttachVolumes()

    // Ensure devices that should be detached/unmounted are detached/unmounted.
    rc.unmountDetachDevices()
}

func (rc *reconciler) unmountVolumes() {
    // Ensure volumes that should be unmounted are unmounted.
    for _, mountedVolume := range rc.actualStateOfWorld.GetAllMountedVolumes() {
        if !rc.desiredStateOfWorld.PodExistsInVolume(mountedVolume.PodName, mountedVolume.VolumeName) {
            // Volume is mounted, unmount it
            klog.V(5).Infof(mountedVolume.GenerateMsgDetailed("Starting operationExecutor.UnmountVolume", ""))
            // 此处UnmountVolume会根据具体的unmounter调用 CleanupMountPoint -> doCleanupMountPoint ,进行挂载卸载和目录删除
            // 这里可能会出现 对于挂载目录卸载失败的情况(社区有关孤儿pod的bug讨论),此时,kubelet的pod清理工作线程无法进行该挂载目录的直接删除
            err := rc.operationExecutor.UnmountVolume(
                mountedVolume.MountedVolume, rc.actualStateOfWorld, rc.kubeletPodsDir)
            if err != nil &&
                !nestedpendingoperations.IsAlreadyExists(err) &&
                !exponentialbackoff.IsExponentialBackoff(err) {
                // Ignore nestedpendingoperations.IsAlreadyExists and exponentialbackoff.IsExponentialBackoff errors, they are expected.
                // Log all other errors.
                klog.Errorf(mountedVolume.GenerateErrorDetailed(fmt.Sprintf("operationExecutor.UnmountVolume failed (controllerAttachDetachEnabled %v)", rc.controllerAttachDetachEnabled), err).Error())
            }
            if err == nil {
                klog.Infof(mountedVolume.GenerateMsgDetailed("operationExecutor.UnmountVolume started", ""))
            }
        }
    }
}

func (rc *reconciler) mountAttachVolumes() {
    // Ensure volumes that should be attached/mounted are attached/mounted.
    for _, volumeToMount := range rc.desiredStateOfWorld.GetVolumesToMount() {
        volMounted, devicePath, err := rc.actualStateOfWorld.PodExistsInVolume(volumeToMount.PodName, volumeToMount.VolumeName)
        volumeToMount.DevicePath = devicePath
        if cache.IsVolumeNotAttachedError(err) {
            if rc.controllerAttachDetachEnabled || !volumeToMount.PluginIsAttachable {
                // Volume is not attached (or doesn't implement attacher), kubelet attach is disabled, wait
                // for controller to finish attaching volume.
                klog.V(5).Infof(volumeToMount.GenerateMsgDetailed("Starting operationExecutor.VerifyControllerAttachedVolume", ""))
                err := rc.operationExecutor.VerifyControllerAttachedVolume(
                    volumeToMount.VolumeToMount,
                    rc.nodeName,
                    rc.actualStateOfWorld)
                if err != nil &&
                    !nestedpendingoperations.IsAlreadyExists(err) &&
                    !exponentialbackoff.IsExponentialBackoff(err) {
                    // Ignore nestedpendingoperations.IsAlreadyExists and exponentialbackoff.IsExponentialBackoff errors, they are expected.
                    // Log all other errors.
                    klog.Errorf(volumeToMount.GenerateErrorDetailed(fmt.Sprintf("operationExecutor.VerifyControllerAttachedVolume failed (controllerAttachDetachEnabled %v)", rc.controllerAttachDetachEnabled), err).Error())
                }
                if err == nil {
                    klog.Infof(volumeToMount.GenerateMsgDetailed("operationExecutor.VerifyControllerAttachedVolume started", ""))
                }
            } else {
                // Volume is not attached to node, kubelet attach is enabled, volume implements an attacher,
                // so attach it
                volumeToAttach := operationexecutor.VolumeToAttach{
                    VolumeName: volumeToMount.VolumeName,
                    VolumeSpec: volumeToMount.VolumeSpec,
                    NodeName:   rc.nodeName,
                }
                klog.V(5).Infof(volumeToAttach.GenerateMsgDetailed("Starting operationExecutor.AttachVolume", ""))
                err := rc.operationExecutor.AttachVolume(volumeToAttach, rc.actualStateOfWorld)
                if err != nil &&
                    !nestedpendingoperations.IsAlreadyExists(err) &&
                    !exponentialbackoff.IsExponentialBackoff(err) {
                    // Ignore nestedpendingoperations.IsAlreadyExists and exponentialbackoff.IsExponentialBackoff errors, they are expected.
                    // Log all other errors.
                    klog.Errorf(volumeToMount.GenerateErrorDetailed(fmt.Sprintf("operationExecutor.AttachVolume failed (controllerAttachDetachEnabled %v)", rc.controllerAttachDetachEnabled), err).Error())
                }
                if err == nil {
                    klog.Infof(volumeToMount.GenerateMsgDetailed("operationExecutor.AttachVolume started", ""))
                }
            }
        } else if !volMounted || cache.IsRemountRequiredError(err) {
            // Volume is not mounted, or is already mounted, but requires remounting
            remountingLogStr := ""
            isRemount := cache.IsRemountRequiredError(err)
            if isRemount {
                remountingLogStr = "Volume is already mounted to pod, but remount was requested."
            }
            klog.V(4).Infof(volumeToMount.GenerateMsgDetailed("Starting operationExecutor.MountVolume", remountingLogStr))
            err := rc.operationExecutor.MountVolume(
                rc.waitForAttachTimeout,
                volumeToMount.VolumeToMount,
                rc.actualStateOfWorld,
                isRemount)
            if err != nil &&
                !nestedpendingoperations.IsAlreadyExists(err) &&
                !exponentialbackoff.IsExponentialBackoff(err) {
                // Ignore nestedpendingoperations.IsAlreadyExists and exponentialbackoff.IsExponentialBackoff errors, they are expected.
                // Log all other errors.
                klog.Errorf(volumeToMount.GenerateErrorDetailed(fmt.Sprintf("operationExecutor.MountVolume failed (controllerAttachDetachEnabled %v)", rc.controllerAttachDetachEnabled), err).Error())
            }
            if err == nil {
                if remountingLogStr == "" {
                    klog.V(1).Infof(volumeToMount.GenerateMsgDetailed("operationExecutor.MountVolume started", remountingLogStr))
                } else {
                    klog.V(5).Infof(volumeToMount.GenerateMsgDetailed("operationExecutor.MountVolume started", remountingLogStr))
                }
            }
        } else if cache.IsFSResizeRequiredError(err) &&
            utilfeature.DefaultFeatureGate.Enabled(features.ExpandInUsePersistentVolumes) {
            klog.V(4).Infof(volumeToMount.GenerateMsgDetailed("Starting operationExecutor.ExpandInUseVolume", ""))
            err := rc.operationExecutor.ExpandInUseVolume(
                volumeToMount.VolumeToMount,
                rc.actualStateOfWorld)
            if err != nil &&
                !nestedpendingoperations.IsAlreadyExists(err) &&
                !exponentialbackoff.IsExponentialBackoff(err) {
                // Ignore nestedpendingoperations.IsAlreadyExists and exponentialbackoff.IsExponentialBackoff errors, they are expected.
                // Log all other errors.
                klog.Errorf(volumeToMount.GenerateErrorDetailed("operationExecutor.ExpandInUseVolume failed", err).Error())
            }
            if err == nil {
                klog.V(4).Infof(volumeToMount.GenerateMsgDetailed("operationExecutor.ExpandInUseVolume started", ""))
            }
        }
    }
}

func (rc *reconciler) unmountDetachDevices() {
    for _, attachedVolume := range rc.actualStateOfWorld.GetUnmountedVolumes() {
        // Check IsOperationPending to avoid marking a volume as detached if it's in the process of mounting.
        if !rc.desiredStateOfWorld.VolumeExists(attachedVolume.VolumeName) &&
            !rc.operationExecutor.IsOperationPending(attachedVolume.VolumeName, nestedpendingoperations.EmptyUniquePodName, nestedpendingoperations.EmptyNodeName) {
            if attachedVolume.DeviceMayBeMounted() {
                // Volume is globally mounted to device, unmount it
                klog.V(5).Infof(attachedVolume.GenerateMsgDetailed("Starting operationExecutor.UnmountDevice", ""))
                err := rc.operationExecutor.UnmountDevice(
                    attachedVolume.AttachedVolume, rc.actualStateOfWorld, rc.hostutil)
                if err != nil &&
                    !nestedpendingoperations.IsAlreadyExists(err) &&
                    !exponentialbackoff.IsExponentialBackoff(err) {
                    // Ignore nestedpendingoperations.IsAlreadyExists and exponentialbackoff.IsExponentialBackoff errors, they are expected.
                    // Log all other errors.
                    klog.Errorf(attachedVolume.GenerateErrorDetailed(fmt.Sprintf("operationExecutor.UnmountDevice failed (controllerAttachDetachEnabled %v)", rc.controllerAttachDetachEnabled), err).Error())
                }
                if err == nil {
                    klog.Infof(attachedVolume.GenerateMsgDetailed("operationExecutor.UnmountDevice started", ""))
                }
            } else {
                // Volume is attached to node, detach it
                // Kubelet not responsible for detaching or this volume has a non-attachable volume plugin.
                if rc.controllerAttachDetachEnabled || !attachedVolume.PluginIsAttachable {
                    rc.actualStateOfWorld.MarkVolumeAsDetached(attachedVolume.VolumeName, attachedVolume.NodeName)
                    klog.Infof(attachedVolume.GenerateMsgDetailed("Volume detached", fmt.Sprintf("DevicePath %q", attachedVolume.DevicePath)))
                } else {
                    // Only detach if kubelet detach is enabled
                    klog.V(5).Infof(attachedVolume.GenerateMsgDetailed("Starting operationExecutor.DetachVolume", ""))
                    err := rc.operationExecutor.DetachVolume(
                        attachedVolume.AttachedVolume, false /* verifySafeToDetach */, rc.actualStateOfWorld)
                    if err != nil &&
                        !nestedpendingoperations.IsAlreadyExists(err) &&
                        !exponentialbackoff.IsExponentialBackoff(err) {
                        // Ignore nestedpendingoperations.IsAlreadyExists && exponentialbackoff.IsExponentialBackoff errors, they are expected.
                        // Log all other errors.
                        klog.Errorf(attachedVolume.GenerateErrorDetailed(fmt.Sprintf("operationExecutor.DetachVolume failed (controllerAttachDetachEnabled %v)", rc.controllerAttachDetachEnabled), err).Error())
                    }
                    if err == nil {
                        klog.Infof(attachedVolume.GenerateMsgDetailed("operationExecutor.DetachVolume started", ""))
                    }
                }
            }
        }
    }
}

// sync process tries to observe the real world by scanning all pods' volume directories from the disk.
// If the actual and desired state of worlds are not consistent with the observed world, it means that some
// mounted volumes are left out probably during kubelet restart. This process will reconstruct
// the volumes and update the actual and desired states. For the volumes that cannot support reconstruction,
// it will try to clean up the mount paths with operation executor.
func (rc *reconciler) sync() {
    defer rc.updateLastSyncTime()
    rc.syncStates()
}

CleanupMountPoint -> doCleanupMountPoint

具体volume卸载操作

  • 如果是挂载点,则先卸载mounter.Unmount(mountPath)
  • os.Remove(mountPath)
 1
 2
 3
 4
 5
 6
 7
 8
 9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
// CleanupMountPoint unmounts the given path and deletes the remaining directory
// if successful. If extensiveMountPointCheck is true IsNotMountPoint will be
// called instead of IsLikelyNotMountPoint. IsNotMountPoint is more expensive
// but properly handles bind mounts within the same fs.
func CleanupMountPoint(mountPath string, mounter Interface, extensiveMountPointCheck bool) error {
    pathExists, pathErr := PathExists(mountPath)
    if !pathExists {
        klog.Warningf("Warning: Unmount skipped because path does not exist: %v", mountPath)
        return nil
    }
    corruptedMnt := IsCorruptedMnt(pathErr)
    if pathErr != nil && !corruptedMnt {
        return fmt.Errorf("Error checking path: %v", pathErr)
    }
    return doCleanupMountPoint(mountPath, mounter, extensiveMountPointCheck, corruptedMnt)
}

// doCleanupMountPoint unmounts the given path and
// deletes the remaining directory if successful.
// if extensiveMountPointCheck is true
// IsNotMountPoint will be called instead of IsLikelyNotMountPoint.
// IsNotMountPoint is more expensive but properly handles bind mounts within the same fs.
// if corruptedMnt is true, it means that the mountPath is a corrupted mountpoint, and the mount point check
// will be skipped
func doCleanupMountPoint(mountPath string, mounter Interface, extensiveMountPointCheck bool, corruptedMnt bool) error {
    var notMnt bool
    var err error
    if !corruptedMnt {
        if extensiveMountPointCheck {
            notMnt, err = IsNotMountPoint(mounter, mountPath)
        } else {
            notMnt, err = mounter.IsLikelyNotMountPoint(mountPath)
        }

        if err != nil {
            return err
        }

        if notMnt {
            klog.Warningf("Warning: %q is not a mountpoint, deleting", mountPath)
            return os.Remove(mountPath)
        }
    }

    // Unmount the mount path
    klog.V(4).Infof("%q is a mountpoint, unmounting", mountPath)
    if err := mounter.Unmount(mountPath); err != nil {
        return err
    }

    if extensiveMountPointCheck {
        notMnt, err = IsNotMountPoint(mounter, mountPath)
    } else {
        notMnt, err = mounter.IsLikelyNotMountPoint(mountPath)
    }
    if err != nil {
        return err
    }
    if notMnt {
        klog.V(4).Infof("%q is unmounted, deleting the directory", mountPath)
        return os.Remove(mountPath)
    }
    return fmt.Errorf("Failed to unmount path %v", mountPath)
}

mountVolumeFunc

执行plugin的SetUp方法,以及更新actual state of world

pendingOperations
  • 根据pendingOperations: nestedpendingoperations.NewNestedPendingOperations,nestedPendingOperations实现了NestedPendingOperations接口,包括Run方法

  • 路径 pkg/volume/util/nestedpendingoperations/nestedpendingoperations.go

mountAttachVolumes

处理分支:

  • Volume is not attached (or doesn’t implement attacher), kubelet attach is disabled, wait for controller to finish attaching volume.
  • Volume is not attached to node, kubelet attach is enabled, volume implements an attacher, so attach it
  • Volume is not mounted, or is already mounted, but requires remounting
MountVolume

对于文件系统卷类型,operationGenerator.GenerateMountVolumeFunc(waitForAttachTimeout, volumeToMount, actualStateOfWorld) 具体挂载操作 GenerateMountVolumeFunc

  1
  2
  3
  4
  5
  6
  7
  8
  9
 10
 11
 12
 13
 14
 15
 16
 17
 18
 19
 20
 21
 22
 23
 24
 25
 26
 27
 28
 29
 30
 31
 32
 33
 34
 35
 36
 37
 38
 39
 40
 41
 42
 43
 44
 45
 46
 47
 48
 49
 50
 51
 52
 53
 54
 55
 56
 57
 58
 59
 60
 61
 62
 63
 64
 65
 66
 67
 68
 69
 70
 71
 72
 73
 74
 75
 76
 77
 78
 79
 80
 81
 82
 83
 84
 85
 86
 87
 88
 89
 90
 91
 92
 93
 94
 95
 96
 97
 98
 99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124

func (oe *operationExecutor) MountVolume(
    waitForAttachTimeout time.Duration,
    volumeToMount VolumeToMount,
    actualStateOfWorld ActualStateOfWorldMounterUpdater,
    isRemount bool) error {
    fsVolume, err := util.CheckVolumeModeFilesystem(volumeToMount.VolumeSpec)
    if err != nil {
        return err
    }
    var generatedOperations volumetypes.GeneratedOperations
    if fsVolume {
        // Filesystem volume case
        // Mount/remount a volume when a volume is attached
        generatedOperations = oe.operationGenerator.GenerateMountVolumeFunc(
            waitForAttachTimeout, volumeToMount, actualStateOfWorld, isRemount)

    } else {
        // Block volume case
        // Creates a map to device if a volume is attached
        generatedOperations, err = oe.operationGenerator.GenerateMapVolumeFunc(
            waitForAttachTimeout, volumeToMount, actualStateOfWorld)
    }
    if err != nil {
        return err
    }
    // Avoid executing mount/map from multiple pods referencing the
    // same volume in parallel
    podName := nestedpendingoperations.EmptyUniquePodName

    // TODO: remove this -- not necessary
    if !volumeToMount.PluginIsAttachable && !volumeToMount.PluginIsDeviceMountable {
        // volume plugins which are Non-attachable and Non-deviceMountable can execute mount for multiple pods
        // referencing the same volume in parallel
        podName = util.GetUniquePodName(volumeToMount.Pod)
    }

    // TODO mount_device
    return oe.pendingOperations.Run(
        volumeToMount.VolumeName, podName, "" /* nodeName */, generatedOperations)
}


// 挂载操作函数
func (og *operationGenerator) GenerateMountVolumeFunc(
    waitForAttachTimeout time.Duration,
    volumeToMount VolumeToMount,
    actualStateOfWorld ActualStateOfWorldMounterUpdater,
    isRemount bool) volumetypes.GeneratedOperations {

    
// .......
    mountVolumeFunc := func() (error, error) {
        // Get mounter plugin
        volumePlugin, err := og.volumePluginMgr.FindPluginBySpec(volumeToMount.VolumeSpec)
        if err != nil || volumePlugin == nil {
            return volumeToMount.GenerateError("MountVolume.FindPluginBySpec failed", err)
        }

        affinityErr := checkNodeAffinity(og, volumeToMount)
        if affinityErr != nil {
            return volumeToMount.GenerateError("MountVolume.NodeAffinity check failed", affinityErr)
        }

        volumeMounter, newMounterErr := volumePlugin.NewMounter(
            volumeToMount.VolumeSpec,
            volumeToMount.Pod,
            volume.VolumeOptions{})


        // get deviceMounter, if possible
        deviceMountableVolumePlugin, _ := og.volumePluginMgr.FindDeviceMountablePluginBySpec(volumeToMount.VolumeSpec)
        if volumeDeviceMounter != nil{
            // Mount device to global mount path
            err = volumeDeviceMounter.MountDevice(
                volumeToMount.VolumeSpec,
                devicePath,
                deviceMountPath)
        }
        

        // SetUp prepares and mounts/unpacks the volume to a
        // self-determined directory path. The mount point and its
        // content should be owned by `fsUser` or 'fsGroup' so that it can be
        // accessed by the pod. This may be called more than once, so
        // implementations must be idempotent.
        // It could return following types of errors:
        //   - TransientOperationFailure
        //   - UncertainProgressError
        //   - Error of any other type should be considered a final error

        // Execute mount
        mountErr := volumeMounter.SetUp(volume.MounterArgs{
            FsUser:              util.FsUserFrom(volumeToMount.Pod),
            FsGroup:             fsGroup,
            DesiredSize:         volumeToMount.DesiredSizeLimit,
            FSGroupChangePolicy: fsGroupChangePolicy,
        })

        // Update actual state of world
        markOpts := MarkVolumeOpts{
            PodName:             volumeToMount.PodName,
            PodUID:              volumeToMount.Pod.UID,
            VolumeName:          volumeToMount.VolumeName,
            Mounter:             volumeMounter,
            OuterVolumeSpecName: volumeToMount.OuterVolumeSpecName,
            VolumeGidVolume:     volumeToMount.VolumeGidValue,
            VolumeSpec:          volumeToMount.VolumeSpec,
            VolumeMountState:    VolumeMounted,
        }
        // MarkVolumeAsMounted实际上更新了attachedVolumes[volumeName].mountedPods,卷的挂载信息:
        //  podObj.volumeMountStateForPod = markVolumeOpts.VolumeMountState
        //  asw.attachedVolumes[volumeName].mountedPods[podName] = podObj
        markVolMountedErr := actualStateOfWorld.MarkVolumeAsMounted(markOpts)
    }
    
// .......

// .......



}

NFS的mount setup

  • 挂载命令默认使用了系统命令mount
  • nfs中为每个volume的挂载目录路径的pluginName是kubernetes.io~nfs
  • mount操作的source为nfs server 的 exportPath
  • mount操作的target为dir,即pod的nfs卷路径位置nfsMounter.GetPath(),示例见下
1
2
source := fmt.Sprintf("%s:%s", nfsMounter.server, nfsMounter.exportPath)
err = nfsMounter.mounter.MountSensitiveWithoutSystemd(source, dir, "nfs", mountOptions, nil)

nfs的挂载volume路径dir示例: var/lib/kubelet/pods/{podid}//volumes/{pluginName}/{pvname}

1
2
3
# nfsMounter.GetPath()
#/var/lib/kubelet/pods/{podid}//volumes/{pluginName}/{pvname}
/var/lib/kubelet/pods/06d10daa-c7e8-46e5-b94a-c0fcd2f27a2e/volumes/kubernetes.io~nfs/pvc-1f9f7ceb-6ca8-453e-87a0-013e53841fad

mount挂载处理

  1
  2
  3
  4
  5
  6
  7
  8
  9
 10
 11
 12
 13
 14
 15
 16
 17
 18
 19
 20
 21
 22
 23
 24
 25
 26
 27
 28
 29
 30
 31
 32
 33
 34
 35
 36
 37
 38
 39
 40
 41
 42
 43
 44
 45
 46
 47
 48
 49
 50
 51
 52
 53
 54
 55
 56
 57
 58
 59
 60
 61
 62
 63
 64
 65
 66
 67
 68
 69
 70
 71
 72
 73
 74
 75
 76
 77
 78
 79
 80
 81
 82
 83
 84
 85
 86
 87
 88
 89
 90
 91
 92
 93
 94
 95
 96
 97
 98
 99
100
101
102
103
104
105
106
107



// SetUp attaches the disk and bind mounts to the volume path.
func (nfsMounter *nfsMounter) SetUp(mounterArgs volume.MounterArgs) error {
    return nfsMounter.SetUpAt(nfsMounter.GetPath(), mounterArgs)
}

func (nfsMounter *nfsMounter) SetUpAt(dir string, mounterArgs volume.MounterArgs) error {
    notMnt, err := mount.IsNotMountPoint(nfsMounter.mounter, dir)
    klog.V(4).Infof("NFS mount set up: %s %v %v", dir, !notMnt, err)
    if err != nil && !os.IsNotExist(err) {
        return err
    }
    if !notMnt {
        return nil
    }
    if err := os.MkdirAll(dir, 0750); err != nil {
        return err
    }
    source := fmt.Sprintf("%s:%s", nfsMounter.server, nfsMounter.exportPath)
    options := []string{}
    if nfsMounter.readOnly {
        options = append(options, "ro")
    }
    mountOptions := util.JoinMountOptions(nfsMounter.mountOptions, options)
    err = nfsMounter.mounter.MountSensitiveWithoutSystemd(source, dir, "nfs", mountOptions, nil)
    if err != nil {
        notMnt, mntErr := mount.IsNotMountPoint(nfsMounter.mounter, dir)
        if mntErr != nil {
            klog.Errorf("IsNotMountPoint check failed: %v", mntErr)
            return err
        }
        if !notMnt {
            if mntErr = nfsMounter.mounter.Unmount(dir); mntErr != nil {
                klog.Errorf("Failed to unmount: %v", mntErr)
                return err
            }
            notMnt, mntErr := mount.IsNotMountPoint(nfsMounter.mounter, dir)
            if mntErr != nil {
                klog.Errorf("IsNotMountPoint check failed: %v", mntErr)
                return err
            }
            if !notMnt {
                // This is very odd, we don't expect it.  We'll try again next sync loop.
                klog.Errorf("%s is still mounted, despite call to unmount().  Will try again next sync loop.", dir)
                return err
            }
        }
        os.Remove(dir)
        return err
    }
    return nil
}



func (plugin *nfsPlugin) newMounterInternal(spec *volume.Spec, pod *v1.Pod, mounter mount.Interface) (volume.Mounter, error) {
    source, readOnly, err := getVolumeSource(spec)
    if err != nil {
        return nil, err
    }

    return &nfsMounter{
        nfs: &nfs{
            volName:         spec.Name(),
            mounter:         mounter,
            pod:             pod,
            plugin:          plugin,
            MetricsProvider: volume.NewMetricsStatFS(getPath(pod.UID, spec.Name(), plugin.host)),
        },
        server:       source.Server,
        exportPath:   source.Path,
        readOnly:     readOnly,
        mountOptions: util.MountOptionFromSpec(spec),
    }, nil
}

// Name returns the name of either Volume or PersistentVolume, one of which must not be nil.
func (spec *Spec) Name() string {
    switch {
    case spec.Volume != nil:
        return spec.Volume.Name
    case spec.PersistentVolume != nil:
        return spec.PersistentVolume.Name
    default:
        return ""
    }
}

// NFS volumes represent a bare host file or directory mount of an NFS export.
type nfs struct {
    volName string
    pod     *v1.Pod
    mounter mount.Interface
    plugin  *nfsPlugin
    volume.MetricsProvider
}

func (nfsVolume *nfs) GetPath() string {
    name := nfsPluginName
    // GetPodVolumeDir returns the absolute path a directory which
    // represents the named volume under the named plugin for the given
    // pod.  If the specified pod does not exist, the result of this call
    // might not exist.
    return nfsVolume.plugin.host.GetPodVolumeDir(nfsVolume.pod.UID, utilstrings.EscapeQualifiedName(name), nfsVolume.volName)
}

Kueblet SyncPod

SyncPod上下文

这里先回顾下pod容器创建准备过程,粗体标注为volume相关的处理。

完成创建容器前的准备工作(SyncPod) 在这个方法中,主要完成以下几件事情:

  • 如果是删除 pod,立即执行并返回
  • 同步 podStatus 到 kubelet.statusManager
  • 检查 pod 是否能运行在本节点,主要是权限检查(是否能使用主机网络模式,是否可以以 privileged 权限运行等)。如果没有权限,就删除本地旧的 pod 并返回错误信息
  • 创建 containerManagar 对象,并且创建 pod level cgroup,更新 Qos level cgroup
  • 如果是 static Pod,就创建或者更新对应的 mirrorPod
  • 创建 pod 的数据目录,存放 volume 和 plugin 信息,如果定义了 pv,等待所有的 volume mount 完成(volumeManager 会在后台做这些事情),如果有 image secrets,去 apiserver 获取对应的 secrets 数据
  • 然后调用 kubelet.volumeManager 组件,等待它将 pod 所需要的所有外挂的 volume 都准备好。
  • 调用 container runtime 的 SyncPod 方法,去实现真正的容器创建逻辑 这里所有的事情都和具体的容器没有关系,可以看到该方法是创建 pod 实体(即容器)之前需要完成的准备工作。
 1
 2
 3
 4
 5
 6
 7
 8
 9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
func (kl *Kubelet) syncPod(o syncPodOptions) error {
    // pull out the required options
    pod := o.pod
    mirrorPod := o.mirrorPod
    podStatus := o.podStatus
    updateType := o.updateType

    // 是否为 删除 pod
    if updateType == kubetypes.SyncPodKill {
        ...
    }
    ...
    // 检查 pod 是否能运行在本节点
    runnable := kl.canRunPod(pod)
    if !runnable.Admit {
        ...
    }

    // 更新 pod 状态
    kl.statusManager.SetPodStatus(pod, apiPodStatus)

    // 如果 pod 非 running 状态则直接 kill 掉
    if !runnable.Admit || pod.DeletionTimestamp != nil || apiPodStatus.Phase == v1.PodFailed {
        ...
    }

    // 加载网络插件
    if rs := kl.runtimeState.networkErrors(); len(rs) != 0 && !kubecontainer.IsHostNetworkPod(pod) {
        ...
    }

    pcm := kl.containerManager.NewPodContainerManager()
    if !kl.podIsTerminated(pod) {
        ...
        // 创建并更新 pod 的 cgroups
        if !(podKilled && pod.Spec.RestartPolicy == v1.RestartPolicyNever) {
            if !pcm.Exists(pod) {
                ...
            }
        }
    }

    // 为 static pod 创建对应的 mirror pod
    if kubepod.IsStaticPod(pod) {
        ...
    }

    // 创建数据目录
    if err := kl.makePodDataDirs(pod); err != nil {
        ...
    }

    // 挂载 volume
    if !kl.podIsTerminated(pod) {
        if err := kl.volumeManager.WaitForAttachAndMount(pod); err != nil {
            ...
        }
    }

    // 获取 secret 信息
    pullSecrets := kl.getPullSecretsForPod(pod)

    // 调用 containerRuntime 的 SyncPod 方法开始创建容器
    result := kl.containerRuntime.SyncPod(pod, apiPodStatus, podStatus, pullSecrets, kl.backOff)
    kl.reasonCache.Update(pod.UID, result)
    if err := result.Error(); err != nil {
        ...
    }

    return nil
}

在上面的上下文中,看到了kubelet的syncpod处理,同步 pod 时,等待 pod attach 和 mount 完成

1
2
3
4
5
6
7
8
func (kl *Kubelet) syncPod(o syncPodOptions) error {
    // 挂载 volume
    if !kl.podIsTerminated(pod) {
        if err := kl.volumeManager.WaitForAttachAndMount(pod); err != nil {
            ...
        }
    }
}

WaitForAttachAndMount

 1
 2
 3
 4
 5
 6
 7
 8
 9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47

func (vm *volumeManager) WaitForAttachAndMount(pod *v1.Pod) error {
    if pod == nil {
        return nil
    }
    // pod的全部挂载卷
    expectedVolumes := getExpectedVolumes(pod)
    if len(expectedVolumes) == 0 {
        // No volumes to verify
        return nil
    }

    klog.V(3).Infof("Waiting for volumes to attach and mount for pod %q", format.Pod(pod))
    uniquePodName := util.GetUniquePodName(pod)

    // Some pods expect to have Setup called over and over again to update.
    // Remount plugins for which this is true. (Atomically updating volumes,
    // like Downward API, depend on this to update the contents of the volume).
    vm.desiredStateOfWorldPopulator.ReprocessPod(uniquePodName)

    err := wait.PollImmediate(
        podAttachAndMountRetryInterval,
        podAttachAndMountTimeout,
        vm.verifyVolumesMountedFunc(uniquePodName, expectedVolumes))

    if err != nil {
        unmountedVolumes :=
            vm.getUnmountedVolumes(uniquePodName, expectedVolumes)
        // Also get unattached volumes for error message
        unattachedVolumes :=
            vm.getUnattachedVolumes(expectedVolumes)
        // 没有被 mount 的volume 数量为0,表示成功完成挂载
        if len(unmountedVolumes) == 0 {
            return nil
        }

        return fmt.Errorf(
            "unmounted volumes=%v, unattached volumes=%v: %s",
            unmountedVolumes,
            unattachedVolumes,
            err)
    }

    klog.V(3).Infof("All volumes are attached and mounted for pod %q", format.Pod(pod))
    return nil
}

verifyVolumesMountedFunc

  • 没有被 mount 的volume 数量为0,表示成功完成挂载
  • UnmountedVolumes = expectedVolumes - mountedVolumes
 1
 2
 3
 4
 5
 6
 7
 8
 9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26

// verifyVolumesMountedFunc returns a method that returns true when all expected
// volumes are mounted.
func (vm *volumeManager) verifyVolumesMountedFunc(podName types.UniquePodName, expectedVolumes []string) wait.ConditionFunc {
    return func() (done bool, err error) {
        if errs := vm.desiredStateOfWorld.PopPodErrors(podName); len(errs) > 0 {
            return true, errors.New(strings.Join(errs, "; "))
        }
        return len(vm.getUnmountedVolumes(podName, expectedVolumes)) == 0, nil
    }
}

// getUnmountedVolumes fetches the current list of mounted volumes from
// the actual state of the world, and uses it to process the list of
// expectedVolumes. It returns a list of unmounted volumes.
// The list also includes volume that may be mounted in uncertain state.
func (vm *volumeManager) getUnmountedVolumes(podName types.UniquePodName, expectedVolumes []string) []string {
    mountedVolumes := sets.NewString()
    for _, mountedVolume := range vm.actualStateOfWorld.GetMountedVolumesForPod(podName) {
        // 实际的挂载卷
        mountedVolumes.Insert(mountedVolume.OuterVolumeSpecName)
    }
    // expectedVolumes为pod的全部挂载卷
    // UnmountedVolumes = expectedVolumes - mountedVolumes
    return filterUnmountedVolumes(mountedVolumes, expectedVolumes)
}

参考资料