当前位置 博文首页 > zhonglinzhang:【kubernetes/k8s源码分析】 client-go包之Infor

    zhonglinzhang:【kubernetes/k8s源码分析】 client-go包之Infor

    作者:[db:作者] 时间:2021-08-04 15:03

    kubernetes v1.12:?https://github.com/kubernetes/kubernetes

    Informer 简介? ?

    ? ? Informer 是 Client-go 中的一个核心工具包。如果 Kubernetes 的某个组件,需要 List/Get Kubernetes 中的 Object(包括pod,service等等),可以直接使用 Informer 实例中的 Lister()方法(包含 Get 和 List 方法)。Informer 最基本的功能就是 List/Get Kubernetes 中的 Object,还可以监听事件并触发回调函数等

    • ??Informer 的 Lister() 方法(List/Get)?,Informer 不会去请求 Kubernetes API,而是查找缓存在本地内存中的数据(由 Informer 自己维护)。好处是Informer 可以快速地返回结果,也减少对 Kubernetes API 的直接调用引起的额外开销。
    • ? Informer 只会调用 Kubernetes List 和 Watch 的 API。Informer 在初始化的时,先调用 Kubernetes List API 获得资源的全部 ,缓存在内存中,然后调用 Watch API 去 watch 这种 resource,维护这份缓存保持一致性
    • ??Informer 可以自定义回调函数,ResourceEventHandler,只需实现 OnAdd OnUpdate OnDelete?方法,分别对应 informer 监听创建、更新和删除事件类型。

    ?

    sharedInformerFactory结构体

    ? ? ?路径client-go/informers/factory.go

    • customResync:?每种Informer自定义的同步周期
    • TweakListOptionsFunc func(*v1.ListOptions):?函数指针,用来调整list选项(第3章节讲解)
    • informers: 缓存每一种informer
    • startedInformers: 用于开启跟踪
    type sharedInformerFactory struct {
    	client           kubernetes.Interface
    	namespace        string
    	tweakListOptions internalinterfaces.TweakListOptionsFunc
    	lock             sync.Mutex
    	defaultResync    time.Duration
    	customResync     map[reflect.Type]time.Duration
    
    	informers map[reflect.Type]cache.SharedIndexInformer
    	// startedInformers is used for tracking which informers have been started.
    	// This allows Start() to be called multiple times safely.
    	startedInformers map[reflect.Type]bool
    }

    ?

    SharedIndexInformer接口

    ? ??sharedIndexInformer实现了该接口

    type SharedIndexInformer interface {
       SharedInformer
       // AddIndexers add indexers to the informer before it starts.
       AddIndexers(indexers Indexers) error
       GetIndexer() Indexer
    }

    ?

    1. 初始化过程

    ? ? ? 路径cmd/kube-controller-manager/app/controllermanager.go?

    • ? Run方法->?CreateControllerContext -> ctx.InformerFactory.Start

    ?

    2.?CreateControllerContext函数

    // CreateControllerContext creates a context struct containing references to resources needed by the
    // controllers such as the cloud provider and clientBuilder. rootClientBuilder is only used for
    // the shared-informers client and token controller.
    func CreateControllerContext(s *config.CompletedConfig, rootClientBuilder, clientBuilder controller.ControllerClientBuilder, stop <-chan struct{}) (ControllerContext, error) {
    	versionedClient := rootClientBuilder.ClientOrDie("shared-informers")

    ? ? 2.1?NewSharedInformerFactoryWithOptions函数

    ? ? ?调用 NewSharedInformerFactory -> NewSharedInformerFactoryWithOptions

    ? ? ?创建sharedInformerFactory实例

    // NewSharedInformerFactoryWithOptions constructs a new instance of a SharedInformerFactory with additional options.
    func NewSharedInformerFactoryWithOptions(client kubernetes.Interface, defaultResync time.Duration, options ...SharedInformerOption) SharedInformerFactory {
    	factory := &sharedInformerFactory{
    		client:           client,
    		namespace:        v1.NamespaceAll,
    		defaultResync:    defaultResync,
    		informers:        make(map[reflect.Type]cache.SharedIndexInformer),
    		startedInformers: make(map[reflect.Type]bool),
    		customResync:     make(map[reflect.Type]time.Duration),
    	}
    
    	// Apply all options
    	for _, opt := range options {
    		factory = opt(factory)
    	}
    
    	return factory
    }

    ? ? 2.2 填充ControllerContext上下文

    ? ? ? ?初始化SharedInformerFactory,并赋值到ctx上下文中

    	// Use a discovery client capable of being refreshed.
    	discoveryClient := rootClientBuilder.ClientOrDie("controller-discovery")
    	cachedClient := cacheddiscovery.NewMemCacheClient(discoveryClient.Discovery())
    	restMapper := restmapper.NewDeferredDiscoveryRESTMapper(cachedClient)
    	go wait.Until(func() {
    		restMapper.Reset()
    	}, 30*time.Second, stop)
    
    	availableResources, err := GetAvailableResources(rootClientBuilder)
    	if err != nil {
    		return ControllerContext{}, err
    	}
    
    	cloud, loopMode, err := createCloudProvider(s.ComponentConfig.KubeCloudShared.CloudProvider.Name, s.ComponentConfig.KubeCloudShared.ExternalCloudVolumePlugin,
    		s.ComponentConfig.KubeCloudShared.CloudProvider.CloudConfigFile, s.ComponentConfig.KubeCloudShared.AllowUntaggedCloud, sharedInformers)
    	if err != nil {
    		return ControllerContext{}, err
    	}
    
    	ctx := ControllerContext{
    		ClientBuilder:      clientBuilder,
    		InformerFactory:    sharedInformers,
    		ComponentConfig:    s.ComponentConfig,
    		RESTMapper:         restMapper,
    		AvailableResources: availableResources,
    		Cloud:              cloud,
    		LoopMode:           loopMode,
    		Stop:               stop,
    		InformersStarted:   make(chan struct{}),
    		ResyncPeriod:       ResyncPeriod(s),
    	}

    ?

    3.??TweakListOptionsFunc

    ? ? ?掉整的参数有namespace?tweakListOptions?customResync,设置list过滤选项

    // WithCustomResyncConfig sets a custom resync period for the specified informer types.
    func WithCustomResyncConfig(resyncConfig map[v1.Object]time.Duration) SharedInformerOption {
    	return func(factory *sharedInformerFactory) *sharedInformerFactory {
    		for k, v := range resyncConfig {
    			factory.customResync[reflect.TypeOf(k)] = v
    		}
    		return factory
    	}
    }
    
    // WithTweakListOptions sets a custom filter on all listers of the configured SharedInformerFactory.
    func WithTweakListOptions(tweakListOptions internalinterfaces.TweakListOptionsFunc) SharedInformerOption {
    	return func(factory *sharedInformerFactory) *sharedInformerFactory {
    		factory.tweakListOptions = tweakListOptions
    		return factory
    	}
    }
    
    // WithNamespace limits the SharedInformerFactory to the specified namespace.
    func WithNamespace(namespace string) SharedInformerOption {
    	return func(factory *sharedInformerFactory) *sharedInformerFactory {
    		factory.namespace = namespace
    		return factory
    	}
    }

    ?

    4. sharedInformerFactory Start方法

    ? ? ? Run函数中调用SharedInformerFactory的start方法(controllerContext.InformerFactory.Start(controllerContext.Stop))

    // Start initializes all requested informers.
    func (f *sharedInformerFactory) Start(stopCh <-chan struct{}) {
    	f.lock.Lock()
    	defer f.lock.Unlock()
    
    	for informerType, informer := range f.informers {
    		if !f.startedInformers[informerType] {
    			go informer.Run(stopCh)
    			f.startedInformers[informerType] = true
    		}
    	}
    }

    ? ? 4.1 sharedIndexInformer结构体

    ? ? ? ?路径client-go/tools/cache/shared_informer.go

    type sharedIndexInformer struct {
    	indexer    Indexer
    	controller Controller
    
    	processor             *sharedProcessor
    	cacheMutationDetector CacheMutationDetector
    
    	// This block is tracked to handle late initialization of the controller
    	listerWatcher ListerWatcher
    	objectType    runtime.Object
    
    	// resyncCheckPeriod is how often we want the reflector's resync timer to fire so it can call
    	// shouldResync to check if any of our listeners need a resync.
    	resyncCheckPeriod time.Duration
    	// defaultEventHandlerResyncPeriod is the default resync period for any handlers added via
    	// AddEventHandler (i.e. they don't specify one and just want to use the shared informer's default
    	// value).
    	defaultEventHandlerResyncPeriod time.Duration
    	// clock allows for testability
    	clock clock.Clock
    
    	started, stopped bool
    	startedLock      sync.Mutex
    
    	// blockDeltas gives a way to stop all event distribution so that a late event handler
    	// can safely join the shared informer.
    	blockDeltas sync.Mutex
    }

    ? ? 4.2?sharedIndexInformer Run函数

    ? ? ? 路径: client-go/tools/cache/shared_informer.go

    • 初始化DeltaFIFO,创建queue
    • s.cacheMutationDetector.Run检查缓存对象是否变化
    • processorStopCh, s.processor.run调用Listener.run和Listener.pop,处理queue
    • s.controller.Run(stopCh)构建Reflector,先list然后在watch
    func (s *sharedIndexInformer) Run(stopCh <-chan struct{}) {
    	defer utilruntime.HandleCrash()

    ? ? 4.2.1 NewDeltaFIFO函数

    ? ? 路径:client-go/tools/cache/delta_fifo.go,第5章节讲解DeltaFIFO

    • DeltaFIFO可以理解生产消费的队列,Reflector为生产者
    func NewDeltaFIFO(keyFunc KeyFunc, compressor DeltaCompressor, knownObjects KeyListerGetter) *DeltaFIFO {
    	f := &DeltaFIFO{
    		items:           map[string]Deltas{},
    		queue:           []string{},
    		keyFunc:         keyFunc,
    		deltaCompressor: compressor,
    		knownObjects:    knownObjects,
    	}
    	f.cond.L = &f.lock
    	return f
    }

    ? ? 4.2.2 创建Config实例

    ? ?包括DletaFIFO队列,list watch,处理函数HandleDeltas等,创建controller实例

    	cfg := &Config{
    		Queue:            fifo,
    		ListerWatcher:    s.listerWatcher,
    		ObjectType:       s.objectType,
    		FullResyncPeriod: s.resyncCheckPeriod,
    		RetryOnError:     false,
    		ShouldResync:     s.processor.shouldResync,
    
    		Process: s.HandleDeltas,
    	}
    
    	func() {
    		s.startedLock.Lock()
    		defer s.startedLock.Unlock()
    
    		s.controller = New(cfg)
    		s.controller.(*controller).clock = s.clock
    		s.started = true
    	}()

    ? ? 4.2.3?CacheMutationDetector接口Run方法

    ? ? ?defaultCacheMutationDetector实现了接口

    func (d *defaultCacheMutationDetector) Run(stopCh <-chan struct{}) {
    	// we DON'T want protection from panics.  If we're running this code, we want to die
    	for {
    		d.CompareObjects()
    
    		select {
    		case <-stopCh:
    			return
    		case <-time.After(d.period):
    		}
    	}
    }

    ? ? 4.2.4?Process的run方法

    ? ? 调用Listener.run和Listener.pop,处理queue

    func (p *sharedProcessor) run(stopCh <-chan struct{}) {
    	func() {
    		p.listenersLock.RLock()
    		defer p.listenersLock.RUnlock()
    		for _, listener := range p.listeners {
    			p.wg.Start(listener.run)
    			p.wg.Start(listener.pop)
    		}
    	}()
    	<-stopCh
    	p.listenersLock.RLock()
    	defer p.listenersLock.RUnlock()
    	for _, listener := range p.listeners {
    		close(listener.addCh) // Tell .pop() to stop. .pop() will tell .run() to stop
    	}
    	p.wg.Wait() // Wait for all .pop() and .run() to stop
    }
    

    ? ? 4.2.4.1 listener run方法

    func (p *processorListener) run() {
    	// this call blocks until the channel is closed.  When a panic happens during the notification
    	// we will catch it, **the offending item will be skipped!**, and after a short delay (one second)
    	// the next notification will be attempted.  This is usually better than the alternative of never
    	// delivering again.
    	stopCh := make(chan struct{})
    	wait.Until(func() {
    		// this gives us a few quick retries before a long pause and then a few more quick retries
    		err := wait.ExponentialBackoff(retry.DefaultRetry, func() (bool, error) {
    			for next := range p.nextCh {
    				switch notification := next.(type) {
    				case updateNotification:
    					p.handler.OnUpdate(notification.oldObj, notification.newObj)
    				case addNotification:
    					p.handler.OnAdd(notification.newObj)
    				case deleteNotification:
    					p.handler.OnDelete(notification.oldObj)
    				default:
    					utilruntime.HandleError(fmt.Errorf("unrecognized notification: %#v", next))
    				}
    			}
    			// the only way to get here is if the p.nextCh is empty and closed
    			return true, nil
    		})
    
    		// the only way to get here is if the p.nextCh is empty and closed
    		if err == nil {
    			close(stopCh)
    		}
    	}, 1*time.Minute, stopCh)
    }
    

    ? ? ?4.2.4.2 controller Run函数

    ? ? ?路径 clieng/tools/cache/controller.go

    ? ? ?初始化Reflector实例(第6章节讲解)

    // Run begins processing items, and will continue until a value is sent down stopCh.
    // It's an error to call Run more than once.
    // Run blocks; call via go.
    func (c *controller) Run(stopCh <-chan struct{}) {
    	defer utilruntime.HandleCrash()
    	go func() {
    		<-stopCh
    		c.config.Queue.Close()
    	}()
    	r := NewReflector(
    		c.config.ListerWatcher,
    		c.config.ObjectType,
    		c.config.Queue,
    		c.config.FullResyncPeriod,
    	)
    	r.ShouldResync = c.config.ShouldResync
    	r.clock = c.clock
    
    	c.reflectorMutex.Lock()
    	c.reflector = r
    	c.reflectorMutex.Unlock()
    
    	var wg wait.Group
    	defer wg.Wait()
    
    	wg.StartWithChannel(stopCh, r.Run)
    
    	wait.Until(c.processLoop, time.Second, stopCh)
    }

    ?

    ?

    ? 2.3.1?调用listener.run方法

    ? ? 主要是根据updateNotification addNotification deleteNotification,对应相应的函数进行处理

    func (p *processorListener) run() {
    	defer utilruntime.HandleCrash()
    
    	for next := range p.nextCh {
    		switch notification := next.(type) {
    		case updateNotification:
    			p.handler.OnUpdate(notification.oldObj, notification.newObj)
    		case addNotification:
    			p.handler.OnAdd(notification.newObj)
    		case deleteNotification:
    			p.handler.OnDelete(notification.oldObj)
    		default:
    			utilruntime.HandleError(fmt.Errorf("unrecognized notification: %#v", next))
    		}
    	}
    }
    

    ? 2.3.3?调用listener.pop方法

    func (p *processorListener) pop() {
    	defer utilruntime.HandleCrash()
    	defer close(p.nextCh) // Tell .run() to stop
    
    	var nextCh chan<- interface{}
    	var notification interface{}
    	for {
    		select {
    		case nextCh <- notification:
    			// Notification dispatched
    			var ok bool
    			notification, ok = p.pendingNotifications.ReadOne()
    			if !ok { // Nothing to pop
    				nextCh = nil // Disable this select case
    			}
    		case notificationToAdd, ok := <-p.addCh:
    			if !ok {
    				return
    			}
    			if notification == nil { // No notification to pop (and pendingNotifications is empty)
    				// Optimize the case - skip adding to pendingNotifications
    				notification = notificationToAdd
    				nextCh = p.nextCh
    			} else { // There is already a notification waiting to be dispatched
    				p.pendingNotifications.WriteOne(notificationToAdd)
    			}
    		}
    	}
    }

    ? 2.4 controller的Run函数

    • 主要是生成对象Reflector,调用其Run方法
    • processLoop函数
    func (c *controller) Run(stopCh <-chan struct{}) {
    	defer utilruntime.HandleCrash()
    	go func() {
    		<-stopCh
    		c.config.Queue.Close()
    	}()
    	r := NewReflector(
    		c.config.ListerWatcher,
    		c.config.ObjectType,
    		c.config.Queue,
    		c.config.FullResyncPeriod,
    	)
    	r.ShouldResync = c.config.ShouldResync
    	r.clock = c.clock
    
    	c.reflectorMutex.Lock()
    	c.reflector = r
    	c.reflectorMutex.Unlock()
    
    	var wg wait.Group
    	defer wg.Wait()
    
    	wg.StartWithChannel(stopCh, r.Run)
    
    	wait.Until(c.processLoop, time.Second, stopCh)
    }

    ? 2.4.1 Reflector的Run函数

    ? Reflector主要对接kubernetes API,调用list/watch方法,更新缓存(先list所有在进行watch操作i)

    // Run starts a watch and handles watch events. Will restart the watch if it is closed.
    // Run will exit when stopCh is closed.
    func (r *Reflector) Run(stopCh <-chan struct{}) {
    	glog.V(3).Infof("Starting reflector %v (%s) from %s", r.expectedType, r.resyncPeriod, r.name)
    	wait.Until(func() {
    		if err := r.ListAndWatch(stopCh); err != nil {
    			utilruntime.HandleError(err)
    		}
    	}, r.period, stopCh)
    }

    ? 2.4.2 ProcessLoop主要调用HandleDeltas

    func (s *sharedIndexInformer) HandleDeltas(obj interface{}) error {
    	s.blockDeltas.Lock()
    	defer s.blockDeltas.Unlock()
    
    	// from oldest to newest
    	for _, d := range obj.(Deltas) {
    		switch d.Type {
    		case Sync, Added, Updated:
    			isSync := d.Type == Sync
    			s.cacheMutationDetector.AddObject(d.Object)
    			if old, exists, err := s.indexer.Get(d.Object); err == nil && exists {
    				if err := s.indexer.Update(d.Object); err != nil {
    					return err
    				}
    				s.processor.distribute(updateNotification{oldObj: old, newObj: d.Object}, isSync)
    			} else {
    				if err := s.indexer.Add(d.Object); err != nil {
    					return err
    				}
    				s.processor.distribute(addNotification{newObj: d.Object}, isSync)
    			}
    		case Deleted:
    			if err := s.indexer.Delete(d.Object); err != nil {
    				return err
    			}
    			s.processor.distribute(deleteNotification{oldObj: d.Object}, false)
    		}
    	}
    	return nil
    }

    ?

    5.?DeltaFIFO

    ? ? 5.1 DeltaFIFO结构体

    ? ? ? 生产消费者队列,用来存储 Watch API 返回的各种事件

    • queue: 实现了先入先出功能,存储对象的键值
    • items: kv存储方式
    type DeltaFIFO struct {
    	// We depend on the property that items in the set are in
    	// the queue and vice versa, and that all Deltas in this
    	// map have at least one Delta.
    	items map[string]Deltas
    	queue []string
    
    	// populated is true if the first batch of items inserted by Replace() has been populated
    	// or Delete/Add/Update was called first.
    	populated bool
    	// initialPopulationCount is the number of items inserted by the first call of Replace()
    	initialPopulationCount int
    
    	// keyFunc is used to make the key used for queued item
    	// insertion and retrieval, and should be deterministic.
    	keyFunc KeyFunc
    
    	// knownObjects list keys that are "known", for the
    	// purpose of figuring out which items have been deleted
    	// when Replace() or Delete() is called.
    	knownObjects KeyListerGetter
    }

    ? ? 5.2 DeltaFIFO方法

    • func (f *DeltaFIFO) Add(obj interface{}) error
    • func (f *DeltaFIFO) Update(obj interface{}) error
    • func (f *DeltaFIFO) List() []interface{}
    • func (f *DeltaFIFO) Delete(obj interface{}) error
    • func (f *DeltaFIFO) Get(obj interface{}) (item interface{}, exists bool, err error)
    • func (f *DeltaFIFO) Pop(process PopProcessFunc) (interface{}, error)

    ? ??

    6.?Reflector

    ? ? 6.1 Reflector结构体

    ? ? ? ?路径 client-go/tools/cache/reflector.go

    ? ? ??监听通过controller中的Reflector实现,主要作用是通过 Kubernetes Watch API 监听某种 resource 下的所有事件

    // Reflector watches a specified resource and causes all changes to be reflected in the given store.
    type Reflector struct {
    	// name identifies this reflector. By default it will be a file:line if possible.
    	name string
    	// metrics tracks basic metric information about the reflector
    	metrics *reflectorMetrics
    
    	// The type of object we expect to place in the store.
    	expectedType reflect.Type
    	// The destination to sync up with the watch source
    	store Store
    	// listerWatcher is used to perform lists and watches.
    	listerWatcher ListerWatcher
    	// period controls timing between one watch ending and
    	// the beginning of the next one.
    
    }

    ? ? 6.2?Reflector Run方法

    ? ? ? ?开启List watch机制

    // Run starts a watch and handles watch events. Will restart the watch if it is closed.
    // Run will exit when stopCh is closed.
    func (r *Reflector) Run(stopCh <-chan struct{}) {
    	glog.V(3).Infof("Starting reflector %v (%s) from %s", r.expectedType, r.resyncPeriod, r.name)
    	wait.Until(func() {
    		if err := r.ListAndWatch(stopCh); err != nil {
    			utilruntime.HandleError(err)
    		}
    	}, r.period, stopCh)
    }

    ?

    7 ListAndWatch函数

    ? ? ? ?通过资源版本list所有item,然后调用watch

    // ListAndWatch first lists all items and get the resource version at the moment of call,
    // and then use the resource version to watch.
    // It returns error if ListAndWatch didn't even try to initialize watch.
    func (r *Reflector) ListAndWatch(stopCh <-chan struct{}) error {
    	glog.V(3).Infof("Listing and watching %v from %s", r.expectedType, r.name)
    	var resourceVersion string

    ? ? 7.1?syncWith

    ? ? ? 使用deltaFIFO的Replace全量替换

    // syncWith replaces the store's items with the given list.
    func (r *Reflector) syncWith(items []runtime.Object, resourceVersion string) error {
    	found := make([]interface{}, 0, len(items))
    	for _, item := range items {
    		found = append(found, item)
    	}
    	return r.store.Replace(found, resourceVersion)
    }

    ? ? 7.2 watchHandler

    ? ? ? 监听资源新版本的增量变化

    // watchHandler watches w and keeps *resourceVersion up to date.
    func (r *Reflector) watchHandler(w watch.Interface, resourceVersion *string, errc chan error, stopCh <-chan struct{}) error {
    	start := r.clock.Now()
    	eventCount := 0
    
    	// Stopping the watcher should be idempotent and if we return from this function there's no way
    	// we're coming back in with the same watch interface.
    	defer w.Stop()
    	// update metrics
    	defer func() {
    		r.metrics.numberOfItemsInWatch.Observe(float64(eventCount))
    		r.metrics.watchDuration.Observe(time.Since(start).Seconds())
    	}()

    ? ? ?7.2.1 从chan中读取对象,使用反射机制

    	for {
    		select {
    		case <-stopCh:
    			return errorStopRequested
    		case err := <-errc:
    			return err
    		case event, ok := <-w.ResultChan():
    			if !ok {
    				break loop
    			}
    			if event.Type == watch.Error {
    				return apierrs.FromObject(event.Object)
    			}
    			if e, a := r.expectedType, reflect.TypeOf(event.Object); e != nil && e != a {
    				utilruntime.HandleError(fmt.Errorf("%s: expected type %v, but watch event object had type %v", r.name, e, a))
    				continue
    			}
    			meta, err := meta.Accessor(event.Object)
    			if err != nil {
    				utilruntime.HandleError(fmt.Errorf("%s: unable to understand watch event %#v", r.name, event))
    				continue
    			}
    			newResourceVersion := meta.GetResourceVersion()

    ? ? ?7.2.2 更具事件类型,对deltaFIFO调用其方法,更新版本并记录当前的版本,也就是在watch,将从这个版本的next开始

    			switch event.Type {
    			case watch.Added:
    				err := r.store.Add(event.Object)
    				if err != nil {
    					utilruntime.HandleError(fmt.Errorf("%s: unable to add watch event object (%#v) to store: %v", r.name, event.Object, err))
    				}
    			case watch.Modified:
    				err := r.store.Update(event.Object)
    				if err != nil {
    					utilruntime.HandleError(fmt.Errorf("%s: unable to update watch event object (%#v) to store: %v", r.name, event.Object, err))
    				}
    			case watch.Deleted:
    				// TODO: Will any consumers need access to the "last known
    				// state", which is passed in event.Object? If so, may need
    				// to change this.
    				err := r.store.Delete(event.Object)
    				if err != nil {
    					utilruntime.HandleError(fmt.Errorf("%s: unable to delete watch event object (%#v) from store: %v", r.name, event.Object, err))
    				}
    			default:
    				utilruntime.HandleError(fmt.Errorf("%s: unable to understand watch event %#v", r.name, event))
    			}
    			*resourceVersion = newResourceVersion
    			r.setLastSyncResourceVersion(newResourceVersion)

    ?

    ?

    cs