手记

kubernetes(k8s) csi 插件attach-detach流程

因为k8s csi plugin的工作流程属于out-tree,所以k8s额外使用了辅助容器来与k8s组件通信,本篇主要分析卷在挂载/卸载时候的代码调用流程。

kubernetes源码

以下代码都在k8s源码中:

https://github.com/kubernetes/kubernetes.git

CSI Plugin

首先看kubernetes代码中csi相关的代码。

pathusage
pkg/volume/csimanager CSI plugin

csiPlugin源码

  • csiPlugin结构体

type csiPlugin struct {
  # host用来与kubelet对接
    host              volume.VolumeHost
    blockEnabled      bool
  # 用来列出对应节点的CSIDriver
    csiDriverLister   csilister.CSIDriverLister
  #用来调用infomer和CSIDriverLister
    csiDriverInformer csiinformer.CSIDriverInformer
}
  • 加载csiPlugin
    ProbeVolumePlugins函数是k8s组件用来加载对应的csiPlugin的函数,调用函数会返回一个空的csiPlugin.

// ProbeVolumePlugins returns implemented pluginsfunc ProbeVolumePlugins() []volume.VolumePlugin {
    p := &csiPlugin{
        host:         nil,
        blockEnabled: utilfeature.DefaultFeatureGate.Enabled(features.CSIBlockVolume),
    }    return []volume.VolumePlugin{p}
}
  • 初始化csiPlugin

# 传入对应的VolumeHostfunc (p *csiPlugin) Init(host volume.VolumeHost) error {
    p.host = host    if utilfeature.DefaultFeatureGate.Enabled(features.CSIDriverRegistry) {
        csiClient := host.GetCSIClient()        if csiClient == nil {
            klog.Warning("The client for CSI Custom Resources is not available, skipping informer initialization")
        } else {            // Start informer for CSIDrivers.
            factory := csiapiinformer.NewSharedInformerFactory(csiClient, csiResyncPeriod)
            p.csiDriverInformer = factory.Csi().V1alpha1().CSIDrivers()
            p.csiDriverLister = p.csiDriverInformer.Lister()            go factory.Start(wait.NeverStop)
        }
    }    // Initializing csiDrivers map and label management channels
    csiDrivers = csiDriversStore{driversMap: map[string]csiDriver{}}
    nim = nodeinfomanager.NewNodeInfoManager(host.GetNodeName(), host)    // TODO(#70514) Init CSINodeInfo object if the CRD exists and create Driver
    // objects for migrated drivers.

    return nil}
  • NewAttacher函数

NewAttacher函数返回csiAttacher实例

func (p *csiPlugin) NewAttacher() (volume.Attacher, error) {
    k8s := p.host.GetKubeClient()    if k8s == nil {
        klog.Error(log("unable to get kubernetes client from host"))        return nil, errors.New("unable to get Kubernetes client")
    }    return &csiAttacher{
        plugin:        p,
        k8s:           k8s,
        waitSleepTime: 1 * time.Second,
    }, nil}
  • NewDetacher函数

同NewAttacher函数,也返回csiAttacher实例

  • NewMounter函数

返回csiMountMgr实例

func (p *csiPlugin) NewMounter(
    spec *volume.Spec,
    pod *api.Pod,
    _ volume.VolumeOptions) (volume.Mounter, error) {
    ...

    mounter := &csiMountMgr{
        plugin:       p,
        k8s:          k8s,
        spec:         spec,
        pod:          pod,
        podUID:       pod.UID,
        driverName:   csiDriverName(pvSource.Driver),
        volumeID:     pvSource.VolumeHandle,
        specVolumeID: spec.Name(),
        csiClient:    csi,
        readOnly:     readOnly,
    }
  ...    return mounter, nil}
  • NewUnmounter方法

同上,也返回csiMountMgr实例

csiAttacher

  • Attach

Attach方法负责创建VolumeAttachment

func (c *csiAttacher) Attach(spec *volume.Spec, nodeName types.NodeName) (string, error) {
  ...

  # 创建VolumeAttachment
    node := string(nodeName)
    pvName := spec.PersistentVolume.GetName()
    attachID := getAttachmentName(csiSource.VolumeHandle, csiSource.Driver, node)

    attachment := &storage.VolumeAttachment{
        ObjectMeta: meta.ObjectMeta{
            Name: attachID,
        },
        Spec: storage.VolumeAttachmentSpec{
            NodeName: node,
            Attacher: csiSource.Driver,
            Source: storage.VolumeAttachmentSource{
                PersistentVolumeName: &pvName,
            },
        },
    }
  ...

  # 判断attach成功的条件是attachment.Status.Attached为True    if _, err := c.waitForVolumeAttachment(csiSource.VolumeHandle, attachID, csiTimeout); err != nil {        return "", err
    }

    klog.V(4).Info(log("attacher.Attach finished OK with VolumeAttachment object [%s]", attachID))    // TODO(71164): In 1.15, return empty devicePath
    return attachID, nil}
  • Detach

Detach方法负责删除VolumeAttachment

func (c *csiAttacher) Detach(volumeName string, nodeName types.NodeName) error {    // volumeName in format driverName<SEP>volumeHandle generated by plugin.GetVolumeName()
    if volumeName == "" {
        klog.Error(log("detacher.Detach missing value for parameter volumeName"))        return errors.New("missing expected parameter volumeName")
    }
    parts := strings.Split(volumeName, volNameSep)    if len(parts) != 2 {
        klog.Error(log("detacher.Detach insufficient info encoded in volumeName"))        return errors.New("volumeName missing expected data")
    }

    driverName := parts[0]
    volID := parts[1]
    attachID := getAttachmentName(volID, driverName, string(nodeName))
  # 删除VolumeAttachment    if err := c.k8s.StorageV1beta1().VolumeAttachments().Delete(attachID, nil); err != nil {        if apierrs.IsNotFound(err) {            // object deleted or never existed, done
            klog.V(4).Info(log("VolumeAttachment object [%v] for volume [%v] not found, object deleted", attachID, volID))            return nil
        }
        klog.Error(log("detacher.Detach failed to delete VolumeAttachment [%s]: %v", attachID, err))        return err
    }

    klog.V(4).Info(log("detacher deleted ok VolumeAttachment.ID=%s", attachID))
  # 等待VolumeAttachment被删除    return c.waitForVolumeDetachment(volID, attachID)
}

csiMountMgr

  • SetUp

SetUp 方法调用SetUpAt方法调用csi driver的NodePublishVolume方法.

func (c *csiMountMgr) SetUp(fsGroup *int64) error {    return c.SetUpAt(c.GetPath(), fsGroup)
}
  • TearDown

TearDown 方法调用TearDownAt方法调用csi driver的NodeUnpublishVolume方法.

func (c *csiMountMgr) TearDown() error {    return c.TearDownAt(c.GetPath())
}

kube-controller-manager源码

pathusage
pkg/controllercontains code for controllers
  • attachDetachController

kube-controller-manager用attachDetachController管理卷的attach和detach,主要逻辑在以下函数中:

pathusage
pkg/controller/volume/attachdetach/reconciler/reconciler.gomanager attach/detach volume
func (rc *reconciler) reconcile() {    // Detaches are triggered before attaches so that volumes referenced by
    // pods that are rescheduled to a different node are detached first.

    // Ensure volumes that should be detached are detached.
  # 遍历已经attach到节点上的卷
  # actualStateOfWorld代表实际的卷与节点的对应关系
  # desiredStateOfWorld代表定义的卷与节点及pod的对应关系    for _, attachedVolume := range
  rc.actualStateOfWorld.GetAttachedVolumes() {

    # 如果该卷不再需要,则进行detach        if !rc.desiredStateOfWorld.VolumeExists(
            attachedVolume.VolumeName, attachedVolume.NodeName) {

            ...
      # DetachVolume方法最终调用plugin的NewDetacher函数,最后调用返回
      # 的csiAttacher的Detach方法
            err = rc.attacherDetacher.DetachVolume(attachedVolume.AttachedVolume, verifySafeToDetach, rc.actualStateOfWorld)
      ...
            }
        }
    }

  # attach对应的卷
    rc.attachDesiredVolumes()    // Update Node Status
    err := rc.nodeStatusUpdater.UpdateNodeStatuses()    if err != nil {
        klog.Warningf("UpdateNodeStatuses failed with: %v", err)
    }
}

# attach卷func (rc *reconciler) attachDesiredVolumes() {    // Ensure volumes that should be attached are attached.
  # GetVolumesToAttach获取需要attach到节点上的卷    for _, volumeToAttach := range rc.desiredStateOfWorld.GetVolumesToAttach() {
    # 判断需求中的卷是否实际中已经存在    if rc.actualStateOfWorld.VolumeNodeExists(volumeToAttach.VolumeName, volumeToAttach.NodeName) {            // Volume/Node exists, touch it to reset detachRequestedTime
            if klog.V(5) {
                klog.Infof(volumeToAttach.GenerateMsgDetailed("Volume attached--touching", ""))
            }
            rc.actualStateOfWorld.ResetDetachRequestTime(volumeToAttach.VolumeName, volumeToAttach.NodeName)            continue
        }
        ...        // Volume/Node doesn't exist, spawn a goroutine to attach it
        if klog.V(5) {
            klog.Infof(volumeToAttach.GenerateMsgDetailed("Starting attacherDetacher.AttachVolume", ""))
        }
    # AttachVolume方法最终调用plugin的NewAttacher函数,最后调用返回
    # 的csiAttacher的Attach方法
        err := rc.attacherDetacher.AttachVolume(volumeToAttach.VolumeToAttach, rc.actualStateOfWorld)
        ...
        }
    }
}

kuebelet源码

pathusage
pkg/kubeletcontains the libraries that drive the Kubelet binary
  • volumemanager

kubelet对卷的处理也在reconcile函数中,注意kubelet和kube-controller-manager都有各自的reconciler,actualStateOfWorld,desiredStateOfWorld定义,不要混淆。

pathusage
pkg/kubelet/volumemanager/reconciler/reconciler.goMainly used to manage the mounting and unmounting of volumes
func (rc *reconciler) reconcile() {    // Unmounts are triggered before mounts so that a volume that was
    // referenced by a pod that was deleted and is now referenced by another
    // pod is unmounted from the first pod before being mounted to the new
    // pod.

    // Ensure volumes that should be unmounted are unmounted.
  # GetMountedVolumes返回的是成功mount到pod上的卷    for _, mountedVolume := range rc.actualStateOfWorld.GetMountedVolumes() {
    # 如果实际挂载的卷不需要被挂载,卸载卷        if !rc.desiredStateOfWorld.PodExistsInVolume(mountedVolume.PodName, mountedVolume.VolumeName) {            // Volume is mounted, unmount it
            klog.V(5).Infof(mountedVolume.GenerateMsgDetailed("Starting operationExecutor.UnmountVolume", ""))
      # UnmountVolume调用plugin的NewUnmounter创建实例,并调用TearDown方法
            err := rc.operationExecutor.UnmountVolume(
                mountedVolume.MountedVolume, rc.actualStateOfWorld, rc.kubeletPodsDir)
            ...
        }
    }

  # GetVolumesToMount返回需要attach到节点并挂载到pod上的卷    // Ensure volumes that should be attached/mounted are attached/mounted.
    for _, volumeToMount := range rc.desiredStateOfWorld.GetVolumesToMount() {
        volMounted, devicePath, err := rc.actualStateOfWorld.PodExistsInVolume(volumeToMount.PodName, volumeToMount.VolumeName)
        volumeToMount.DevicePath = devicePath
    # 如果需要挂载的卷还没有被挂载        if cache.IsVolumeNotAttachedError(err) {
      # controllerAttachDetachEnabled为true的时候,一般为true
      #            if rc.controllerAttachDetachEnabled || !volumeToMount.PluginIsAttachable {                // Volume is not attached (or doesn't implement attacher), kubelet attach is disabled, wait
                // for controller to finish attaching volume.
                klog.V(5).Infof(volumeToMount.GenerateMsgDetailed("Starting operationExecutor.VerifyControllerAttachedVolume", ""))
        # 判断卷的状态是否是attach到node上
                err := rc.operationExecutor.VerifyControllerAttachedVolume(
                    volumeToMount.VolumeToMount,
                    rc.nodeName,
                    rc.actualStateOfWorld)
        ...
            } else {
        # 不使用controller的attach/detach controller,kubelet 直接调用plugin去attach                // Volume is not attached to node, kubelet attach is enabled, volume implements an attacher,
                // so attach it
                volumeToAttach := operationexecutor.VolumeToAttach{
                    VolumeName: volumeToMount.VolumeName,
                    VolumeSpec: volumeToMount.VolumeSpec,
                    NodeName:   rc.nodeName,
                }
                klog.V(5).Infof(volumeToAttach.GenerateMsgDetailed("Starting operationExecutor.AttachVolume", ""))
                err := rc.operationExecutor.AttachVolume(volumeToAttach, rc.actualStateOfWorld)
                ...
            }
        } else if !volMounted || cache.IsRemountRequiredError(err) {
      # attach到node上的卷下次循环中会进入这个分支重新挂载
      # 重新挂载            // Volume is not mounted, or is already mounted, but requires remounting
            remountingLogStr := ""
            isRemount := cache.IsRemountRequiredError(err)            if isRemount {
                remountingLogStr = "Volume is already mounted to pod, but remount was requested."
            }
            klog.V(4).Infof(volumeToMount.GenerateMsgDetailed("Starting operationExecutor.MountVolume", remountingLogStr))
      #
            err := rc.operationExecutor.MountVolume(
                rc.waitForAttachTimeout,
                volumeToMount.VolumeToMount,
                rc.actualStateOfWorld,
                isRemount)
            ...
        } else if cache.IsFSResizeRequiredError(err) &&
      # volume need resize
            utilfeature.DefaultFeatureGate.Enabled(features.ExpandInUsePersistentVolumes) {
            klog.V(4).Infof(volumeToMount.GenerateMsgDetailed("Starting operationExecutor.ExpandVolumeFSWithoutUnmounting", ""))
            err := rc.operationExecutor.ExpandVolumeFSWithoutUnmounting(
                volumeToMount.VolumeToMount,
                rc.actualStateOfWorld)
            ...
        }
    }

  # GetUnmountedVolumes返回的attach但是没有挂载到任何pod上的卷  // Ensure devices that should be detached/unmounted are detached/unmounted.
    for _, attachedVolume := range rc.actualStateOfWorld.GetUnmountedVolumes() {        // Check IsOperationPending to avoid marking a volume as detached if it's in the process of mounting.
        if !rc.desiredStateOfWorld.VolumeExists(attachedVolume.VolumeName) &&
            !rc.operationExecutor.IsOperationPending(attachedVolume.VolumeName, nestedpendingoperations.EmptyUniquePodName) {            if attachedVolume.GloballyMounted {                // Volume is globally mounted to device, unmount it
                klog.V(5).Infof(attachedVolume.GenerateMsgDetailed("Starting operationExecutor.UnmountDevice", ""))
                err := rc.operationExecutor.UnmountDevice(
                    attachedVolume.AttachedVolume, rc.actualStateOfWorld, rc.mounter)
                ...
            } else {                // Volume is attached to node, detach it
                // Kubelet not responsible for detaching or this volume has a non-attachable volume plugin.
        # 等待controller detach                if rc.controllerAttachDetachEnabled || !attachedVolume.PluginIsAttachable {
                    rc.actualStateOfWorld.MarkVolumeAsDetached(attachedVolume.VolumeName, attachedVolume.NodeName)
                    klog.Infof(attachedVolume.GenerateMsgDetailed("Volume detached", fmt.Sprintf("DevicePath %q", attachedVolume.DevicePath)))
                } else {                    // Only detach if kubelet detach is enabled
                    klog.V(5).Infof(attachedVolume.GenerateMsgDetailed("Starting operationExecutor.DetachVolume", ""))
                    err := rc.operationExecutor.DetachVolume(
                        attachedVolume.AttachedVolume, false /* verifySafeToDetach */, rc.actualStateOfWorld)
                    ...
                }
            }
        }
    }
}

external-attacher源码

https://github.com/kubernetes-csi/external-attacher.git

在attach/detach的时候external-attacher主要用来监控VolumeAttachment并调用csi driver中的相关方法

  • CSIAttachController

pathusage
pkg/controller/controller.goattaches / detaches CSI volumes using provided Handler interface

这个controller负责监控VolumeAttachment并调用csi driver的相关方法。

// CSIAttachController is a controller that attaches / detaches CSI volumes using provided Handler interfacetype CSIAttachController struct {
    client        kubernetes.Interface
    attacherName  string
    handler       Handler
    eventRecorder record.EventRecorder
    vaQueue       workqueue.RateLimitingInterface
    pvQueue       workqueue.RateLimitingInterface

    vaLister       storagelisters.VolumeAttachmentLister
    vaListerSynced cache.InformerSynced
    pvLister       corelisters.PersistentVolumeLister
    pvListerSynced cache.InformerSynced
}
  • SyncNewOrUpdatedVolumeAttachment

根据VolumeAttachment的状态来调用csi driver,并更新VolumeAttachment的状态。

func (h *csiHandler) SyncNewOrUpdatedVolumeAttachment(va *storage.VolumeAttachment) {
    glog.V(4).Infof("CSIHandler: processing VA %q", va.Name)    var err error    if va.DeletionTimestamp == nil {
        err = h.syncAttach(va)
    } else {
        err = h.syncDetach(va)
    }
  ...
}
  • syncAttach

attach操作会调用csi driver的ControllerPublishVolume方法

func (h *csiHandler) syncAttach(va *storage.VolumeAttachment) error {    if va.Status.Attached {        // Volume is attached, there is nothing to be done.
        glog.V(4).Infof("%q is already attached", va.Name)        return nil
    }    // Attach and report any error
    glog.V(2).Infof("Attaching %q", va.Name)
    # csiAttach最终调用csi driver的ControllerPublishVolume方法
    va, metadata, err := h.csiAttach(va)
    ...
    glog.V(2).Infof("Attached %q", va.Name)    // Mark as attached
    if _, err := markAsAttached(h.client, va, metadata); err != nil {        return fmt.Errorf("failed to mark as attached: %s", err)
    }
    glog.V(4).Infof("Fully attached %q", va.Name)    return nil}
  • syncDetach

syncDetach会调用csi driver的ControllerUnpublishVolume,然后把VolumeAttachment的状态置为detach

结论

以这个卷的挂载流程来说明下各部分都是如何工作的:

CreateVolume +------------+ DeleteVolume
+------------->|  CREATED   +--------------+
|              +---+----+---+              |
|       Controller |    | Controller       v
+++         Publish |    | Unpublish       +++|X|          Volume |    | Volume          | |+-+             +---v----+---+             +-+             | NODE_READY |
             +---+----^---+
            Node |    | Node
         Publish |    | Unpublish
          Volume |    | Volume
             +---v----+---+             | PUBLISHED  |
             +------------+
  • 挂载卷的过程:

  1. kube-controller-manager调用csi driver plugin来创建VolumeAttachment

  2. external-attach 监控VolumeAttachment并调用csi driver的ControllerPublishVolume方法,根据返回值更改VolumeAttachment的状态为attach

  3. kubelet 判断卷的状态是否为attach, 如果是则调用csi driver plugin的NodePublishVolume来进行挂载.

  • 卸载卷:

  1. kubelet 把未挂载到pod(pod被删除)上的卷调用csi driver plugin的NodeUnpublishVolume方法解绑

  2. kube-controller-manager 对attach到node但是没有使用的卷进行detach

  3. external-attach调用csi driver plugin的ControllerUnpublishVolume进行detach

ps:

1.一个VolumeAttachment对应一个绑定关系,所以如果是单节点attach,那其他VolumeAttachment创建不成功

2.当kubelet服务出现问题的时候,此时k8s会调度删除此节点上的pod,但是删除的时候会因为kubelet出问题而卡住。

此时kube-controller-manager的desiredStateOfWorld中依然存在这个卷,对应的VolumeAttachment就不会被删除,卷也不会被重新挂载。

当kubelet重启的时候,pod会被彻底删除,kube-controller-manager会调用卷的detach,也就是detach首先完成。

而kubelet重新启动的时候会重新加载actualStateOfWorld和desiredStateOfWorld,
因为pod被删除,所以kubelet的actualStateOfWorld没有pod的挂载信息,后续不会对这个卷进行操作,导致上次的挂载信息依然存在。

         

             




作者:wangwDavid
链接:https://www.jianshu.com/p/5c6e78b6b320


1人推荐
随时随地看视频
慕课网APP