当前位置 博文首页 > 良凯尔:external-attacher源码分析(1)-main方法与启动参数分析

    良凯尔:external-attacher源码分析(1)-main方法与启动参数分析

    作者:良凯尔 时间:2021-07-03 18:28

    ceph-csi分析-external-attacher源码分析。external-attacher属于external plugin中的一个,辅助csi plugin组件,共同完成了存储相关操作。external-attacher watch volumeAttachment对象,然后调用csi plugin来做attach/dettach操作,并修改volumeAttachment对象与pv对象。

    更多 ceph-csi 其他源码分析,请查看下面这篇博文:kubernetes ceph-csi分析目录导航

    摘要

    ceph-csi分析-external-attacher源码分析。external-attacher属于external plugin中的一个,辅助csi plugin组件,共同完成了存储相关操作。external-attacher watch volumeAttachment对象,然后调用csi plugin来做attach/dettach操作,并修改volumeAttachment对象与pv对象。

    基于tag v2.1.1

    https://github.com/kubernetes-csi/external-attacher/releases/tag/v2.1.1

    external-attacher

    external-attacher属于external plugin中的一个。下面我们先来回顾一下external plugin以及csi系统结构。

    external plugin

    external plugin包括了external-provisioner、external-attacher、external-resizer、external-snapshotter等,external plugin辅助csi plugin组件,共同完成了存储相关操作。external plugin负责watch pvc、volumeAttachment等对象,然后调用volume plugin来完成存储的相关操作。如external-provisioner watch pvc对象,然后调用csi plugin来创建存储,最后创建pv对象;external-attacher watch volumeAttachment对象,然后调用csi plugin来做attach/dettach操作,并修改volumeAttachment对象与pv对象;external-resizer watch pvc对象,然后调用csi plugin来做存储的扩容操作等。

    csi系统结构

    csi系统结构图.png

    external-attacher作用分析

    根据CSI plugin是否支持ControllerPublish/ControllerUnpublish操作,external-attacher的作用分为如下两种:
    (1)当CSI plugin不支持ControllerPublish/ControllerUnpublish操作时,AD controller(或kubelet的volume manager)创建VolumeAttachment对象后,external-attacher仅参与VolumeAttachment对象的修改,将attached属性值patch为true;而external-attacher对pv对象无任何同步处理操作。
    (2)当CSI plugin支持ControllerPublish/ControllerUnpublish操作时,external-attacher调用csi plugin(ControllerPublishVolume)进行存储的attach操作,然后更改VolumeAttachment对象,将attached属性值patch为true,并patch pv对象,增加该external-attacher相关的Finalizer;对于pv对象,external-attacher负责处理pv对象的finalizer,patch pv对象,去除该external-attacher相关的finalizer(该external-attacher执行attach操作时添加的finalizer)。

    源码分析

    external-attacher的源码分析分为两部分:
    (1)main方法以及启动参数分析;
    (2)核心处理逻辑分析。

    这篇博客先对external-attacher的main方法以及启动参数做分析,下篇博客将对external-attacher的核心处理逻辑进行源码分析。

    main

    main方法主要逻辑:
    (1)解析启动参数;
    (2)校验worker-threads配置;
    (3)根据配置建立clientset;
    (4)建立与csi plugin的grpcclient;
    (5)调用csi plugin的探测方法(探测cephcsi-rbd服务是否准备好),直至探测成功;
    (6)获取driver名称;
    (7)获取csi plugin提供的能力,判断是否提供Controller服务,当不支持时实例化TrivialHandler,当支持时进入下一步;
    (8)获取csi plugin提供的能力,判断是否支持ControllerPublish/ControllerUnpublish操作,支持则实例化CSIHandler,不支持则实例化TrivialHandler(CSIHandler与TrivialHandler在下一篇博客再做具体介绍,这里只要知道有这么两个东西就行);
    (9)ceph-csi不提供Controller attach服务,所以走NewTrivialHandler;
    (10)获取csi plugin提供的能力,判断是否提供ListVolumesPublishedNodes服务;
    (11)新建CSIAttachController;
    (12)定义run方法,方法中调用CSIAttachController的run方法;
    (13)进行高可用选主相关操作,并运行run方法。

    func main() {
        // 解析启动参数
    	klog.InitFlags(nil)
    	flag.Set("logtostderr", "true")
    	flag.Parse()
        
        // --version输出
    	if *showVersion {
    		fmt.Println(os.Args[0], version)
    		return
    	}
    	klog.Infof("Version: %s", version)
        
        // 根据配置建立clientset
    	// Create the client config. Use kubeconfig if given, otherwise assume in-cluster.
    	config, err := buildConfig(*kubeconfig)
    	if err != nil {
    		klog.Error(err.Error())
    		os.Exit(1)
    	}
        
        // 校验worker-threads配置
    	if *workerThreads == 0 {
    		klog.Error("option -worker-threads must be greater than zero")
    		os.Exit(1)
    	}
        
        // 根据配置建立clientset
    	clientset, err := kubernetes.NewForConfig(config)
    	if err != nil {
    		klog.Error(err.Error())
    		os.Exit(1)
    	}
    
    	factory := informers.NewSharedInformerFactory(clientset, *resync)
    	var handler controller.Handler
    	metricsManager := metrics.NewCSIMetricsManager("" /* driverName */)
        
        // 建立grpcclient
    	// Connect to CSI.
    	csiConn, err := connection.Connect(*csiAddress, metricsManager, connection.OnConnectionLoss(connection.ExitOnConnectionLoss()))
    	if err != nil {
    		klog.Error(err.Error())
    		os.Exit(1)
    	}
        
        // 进行grpc探测(探测cephcsi-rbd服务是否准备好),直至探测成功
    	err = rpc.ProbeForever(csiConn, *timeout)
    	if err != nil {
    		klog.Error(err.Error())
    		os.Exit(1)
    	}
        
        // 获取driver名称
    	// Find driver name.
    	ctx, cancel := context.WithTimeout(context.Background(), csiTimeout)
    	defer cancel()
    	csiAttacher, err := rpc.GetDriverName(ctx, csiConn)
    	if err != nil {
    		klog.Error(err.Error())
    		os.Exit(1)
    	}
    	klog.V(2).Infof("CSI driver name: %q", csiAttacher)
    	metricsManager.SetDriverName(csiAttacher)
    	metricsManager.StartMetricsEndpoint(*metricsAddress, *metricsPath)
        
        // 获取csi plugin提供的能力,判断是否提供Controller服务
    	supportsService, err := supportsPluginControllerService(ctx, csiConn)
    	if err != nil {
    		klog.Error(err.Error())
    		os.Exit(1)
    	}
    	if !supportsService {
    	    // 实例化TrivialHandler
    		handler = controller.NewTrivialHandler(clientset)
    		klog.V(2).Infof("CSI driver does not support Plugin Controller Service, using trivial handler")
    	} else {
    	    // 获取csi plugin提供的能力,判断是否提供Controller attach服务
    		// Find out if the driver supports attach/detach.
    		supportsAttach, supportsReadOnly, err := supportsControllerPublish(ctx, csiConn)
    		if err != nil {
    			klog.Error(err.Error())
    			os.Exit(1)
    		}
    		if supportsAttach {
    			pvLister := factory.Core().V1().PersistentVolumes().Lister()
    			nodeLister := factory.Core().V1().Nodes().Lister()
    			vaLister := factory.Storage().V1beta1().VolumeAttachments().Lister()
    			csiNodeLister := factory.Storage().V1beta1().CSINodes().Lister()
    			volAttacher := attacher.NewAttacher(csiConn)
    			CSIVolumeLister := attacher.NewVolumeLister(csiConn)
    			// 实例化CSIHandler
    			handler = controller.NewCSIHandler(clientset, csiAttacher, volAttacher, CSIVolumeLister, pvLister, nodeLister, csiNodeLister, vaLister, timeout, supportsReadOnly, csitrans.New())
    			klog.V(2).Infof("CSI driver supports ControllerPublishUnpublish, using real CSI handler")
    		} else {
    		    // 实例化TrivialHandler
    			handler = controller.NewTrivialHandler(clientset)
    			klog.V(2).Infof("CSI driver does not support ControllerPublishUnpublish, using trivial handler")
    		}
    	}
        
        // 获取csi plugin提供的能力,判断是否提供ListVolumesPublishedNodes服务
    	slvpn, err := supportsListVolumesPublishedNodes(ctx, csiConn)
    	if err != nil {
    		klog.Errorf("Failed to check if driver supports ListVolumesPublishedNodes, assuming it does not: %v", err)
    	}
    
    	if slvpn {
    		klog.V(2).Infof("CSI driver supports list volumes published nodes. Using capability to reconcile volume attachment objects with actual backend state")
    	}
        
        // 新建CSIAttachController
    	ctrl := controller.NewCSIAttachController(
    		clientset,
    		csiAttacher,
    		handler,
    		factory.Storage().V1beta1().VolumeAttachments(),
    		factory.Core().V1().PersistentVolumes(),
    		workqueue.NewItemExponentialFailureRateLimiter(*retryIntervalStart, *retryIntervalMax),
    		workqueue.NewItemExponentialFailureRateLimiter(*retryIntervalStart, *retryIntervalMax),
    		slvpn,
    		*reconcileSync,
    	)
        
        // 定义run方法,方法中调用CSIAttachController的run方法
    	run := func(ctx context.Context) {
    		stopCh := ctx.Done()
    		factory.Start(stopCh)
    		ctrl.Run(int(*workerThreads), stopCh)
    	}
        
        // 进行高可用选主相关操作,并运行run方法
    	if !*enableLeaderElection {
    		run(context.TODO())
    	} else {
    		// Name of config map with leader election lock
    		lockName := "external-attacher-leader-" + csiAttacher
    		le := leaderelection.NewLeaderElection(clientset, lockName, run)
    
    		if *leaderElectionNamespace != "" {
    			le.WithNamespace(*leaderElectionNamespace)
    		}
    
    		if err := le.Run(); err != nil {
    			klog.Fatalf("failed to initialize leader election: %v", err)
    		}
    	}
    }
    

    启动参数分析

    启动参数列表

    具体参考https://github.com/kubernetes-csi/external-attacher#command-line-options

    // Command line flags
    var (
    	kubeconfig    = flag.String("kubeconfig", "", "Absolute path to the kubeconfig file. Required only when running out of cluster.")
    	resync        = flag.Duration("resync", 10*time.Minute, "Resync interval of the controller.")
    	csiAddress    = flag.String("csi-address", "/run/csi/socket", "Address of the CSI driver socket.")
    	showVersion   = flag.Bool("version", false, "Show version.")
    	timeout       = flag.Duration("timeout", 15*time.Second, "Timeout for waiting for attaching or detaching the volume.")
    	workerThreads = flag.Uint("worker-threads", 10, "Number of attacher worker threads")
    
    	retryIntervalStart = flag.Duration("retry-interval-start", time.Second, "Initial retry interval of failed create volume or deletion. It doubles with each failure, up to retry-interval-max.")
    	retryIntervalMax   = flag.Duration("retry-interval-max", 5*time.Minute, "Maximum retry interval of failed create volume or deletion.")
    
    	enableLeaderElection    = flag.Bool("leader-election", false, "Enable leader election.")
    	leaderElectionNamespace = flag.String("leader-election-namespace", "", "Namespace where the leader election resource lives. Defaults to the pod namespace if not set.")
    
    	reconcileSync = flag.Duration("reconcile-sync", 1*time.Minute, "Resync interval of the VolumeAttachment reconciler.")
    
    	metricsAddress = flag.String("metrics-address", "", "The TCP network address where the prometheus metrics endpoint will listen (example: `:8080`). The default is empty string, which means metrics endpoint is disabled.")
    	metricsPath    = flag.String("metrics-path", "/metrics", "The HTTP path where prometheus metrics will be exposed. Default is `/metrics`.")
    )
    
    func main() {
    	klog.InitFlags(nil)
    	flag.Set("logtostderr", "true")
    	flag.Parse()
    ......
    

    external-attacher容器启动参数

        - args:
        - --v=5
        - --csi-address=$(ADDRESS)
        - --leader-election=true
        - --retry-interval-start=500ms
        env:
        - name: ADDRESS
          value: /csi/csi-provisioner.sock
        volumeMounts:
        - mountPath: /csi
          name: socket-dir
    volumes:
      - emptyDir:
          medium: Memory
        name: socket-dir
    
    

    下面是部分参数解析:

    v

    日志打印等级。

    csi-address

    csi plugin(controller server)暴露服务的socket地址。

    leader-election

    是否开启高可用选主(使用Leases)。

    retry-interval-start

    VA/PV对象同步失败后的重试时间间隔。

    retry-interval-max

    VA/PV对象同步失败后的最大重试时间间隔。

    timeout

    与CSI driver通信的超时时间,默认15秒。

    worker-threads

    同步VA/PV对象的工作线程数量,默认10。

    总结

    external-attacher属于external plugin中的一个。

    external-attacher作用分析

    根据CSI plugin是否支持ControllerPublish/ControllerUnpublish操作,external-attacher的作用分为如下两种:
    (1)当CSI plugin不支持ControllerPublish/ControllerUnpublish操作时,AD controller(或kubelet的volume manager)创建VolumeAttachment对象后,external-attacher仅参与VolumeAttachment对象的修改,将attached属性值patch为true;而external-attacher对pv对象无任何同步处理操作。
    (2)当CSI plugin支持ControllerPublish/ControllerUnpublish操作时,external-attacher调用csi plugin(ControllerPublishVolume)进行存储的attach操作,然后更改VolumeAttachment对象,将attached属性值patch为true,并patch pv对象,增加该external-attacher相关的Finalizer;对于pv对象,external-attacher负责处理pv对象的finalizer,patch pv对象,去除该external-attacher相关的finalizer(该external-attacher执行attach操作时添加的finalizer)。

    external-attacher与ceph-csi结合使用

    ceph-csi不支持ControllerPublish/ControllerUnpublish操作,所以external-attacher与ceph-csi 结合使用,external-attacher仅参与VolumeAttachment对象的修改,将attached属性值patch为true。

    bkbky
    下一篇:没有了