Fading Coder

One Final Commit for the Last Sprint

Home > Notes > Content

Deep Dive into Kubernetes client-go Informer Mechanism

Notes May 17 2

Controller Architectuer

Controller serves as the core component for initializing and managing the Informer lifecycle. Its structure is defined as follows:

type controller struct {
    config         Config
    reflector      *Reflector
    reflectorMutex sync.RWMutex
    clock          clock.Clock
}

The controller interface exposes essential methods:

type Controller interface {
    Run(stopCh <-chan struct{})
    HasSynced() bool
    LastSyncResourceVersion() string
}

The Run() method handles two primary tasks:

  1. Initializes the Reflector to synchronize object events into DeltaFIFO
  2. Processes objects popped from DeltaFIFO using the configured ProcessFunc

Initialization

Controller initialization is straightforward:

func New(c *Config) Controller {
    return &controller{
        config: *c,
        clock: &clock.RealClock{},
    }
}

In practice, SharedIndexInformer initializes the controller during its Run() method:

func (s *sharedIndexInformer) Run(stopCh <-chan struct{}) {
    fifo := NewDeltaFIFOWithOptions(DeltaFIFOOptions{ /*...*/ })
    
    cfg := &Config{
        Queue:            fifo,
        ListerWatcher:    s.listerWatcher,
        ObjectType:       s.objectType,
        FullResyncPeriod: s.resyncCheckPeriod,
        Process:          s.HandleDeltas,
        // ...
    }
    
    s.controller = New(cfg)
    s.controller.Run(stopCh)
}

Execution Flow

When started, the controller performs:

func (c *controller) Run(stopCh <-chan struct{}) {
    r := NewReflector( // Initialize Reflector with config
        c.config.ListerWatcher,
        c.config.ObjectType,
        c.config.Queue,
        c.config.FullResyncPeriod,
    )
    // Configure reflector properties
    
    go r.Run(stopCh) // Start reflector
    
    wait.Until(c.processLoop, time.Second, stopCh)
}

The processLoop() continuously pops objects from DeltaFIFO:

func (c *controller) processLoop() {
    for {
        obj, err := c.config.Queue.Pop(PopProcessFunc(c.config.Process))
        // Handle errors and requeue if necessary
    }
}

Event Processing via HandleDeltas

SharedIndexInformer's HandleDeltas() method processes Delta objects:

func (s *sharedIndexInformer) HandleDeltas(obj interface{}) error {
    deltas := obj.(Deltas)
    for _, delta := range deltas {
        switch delta.Type {
        case Sync, Replaced, Added, Updated:
            if existing, exists, err := s.indexer.Get(delta.Object); err == nil && exists {
                s.indexer.Update(delta.Object) // Update cache
                s.processor.distribute(updateNotification{ /*...*/ }, false)
            } else {
                s.indexer.Add(delta.Object) // Add to cache
                s.processor.distribute(addNotification{ /*...*/ }, false)
            }
        case Deleted:
            s.indexer.Delete(delta.Object) // Remove from cache
            s.processor.distribute(deleteNotification{ /*...*/ }, false)
        }
    }
    return nil
}

SharedIndexInformer Architecture

SharedIndexInformer implements enhanced event distribution:

type sharedIndexInformer struct {
    indexer    Indexer
    controller Controller
    processor *sharedProcessor
    // ...
}

Processor Mechanism

sharedProcessor manages multiple processorListener instances:

type sharedProcessor struct {
    listeners        []*processorListener
    syncingListeners []*processorListener
    // ...
}

Each processorListener routes notifications to its ResourceEventHandler:

type processorListener struct {
    nextCh chan interface{}
    addCh  chan interface{}
    handler ResourceEventHandler
    // ...
}

Key methods:

  • run(): Dispatches events to handler methods
  • add(): Enqueues incoming notifications
  • pop(): Moves notifications from buffer to disptach channel

SharedIndexInformer Execution

The Run() method coordinates components:

func (s *sharedIndexInformer) Run(stopCh <-chan struct{}) {
    fifo := NewDeltaFIFOWithOptions( /*...*/ )
    cfg := &Config{ /*...*/ }
    s.controller = New(cfg)
    
    processorStopCh := make(chan struct{})
    go s.processor.run(processorStopCh) // Start processor
    
    s.controller.Run(stopCh) // Start controller
}

SharedInformerFactory Implemantation

Provides grouped informers for API resources:

type sharedInformerFactory struct {
    client        kubernetes.Interface
    informers     map[reflect.Type]cache.SharedIndexInformer
    // ...
}

Initialization:

func NewSharedInformerFactory(client kubernetes.Interface, resync time.Duration) SharedInformerFactory {
    return &sharedInformerFactory{
        client:     client,
        defaultResync: resync,
        informers: make(map[reflect.Type]cache.SharedIndexInformer),
    }
}

Starting all informers:

func (f *sharedInformerFactory) Start(stopCh <-chan struct{}) {
    for informerType, informer := range f.informers {
        go informer.Run(stopCh) // Start each informer
    }
}

Resource-specific informers (e.g., for Deployments) are accessed via:

func (f *sharedInformerFactory) Apps() apps.Interface {
    return appsinformer.New(f, f.namespace, f.tweakListOptions)
}

This hierarchical structure allows retrieving specialized informers for any Kubernetes resource.

Related Articles

Designing Alertmanager Templates for Prometheus Notifications

How to craft Alertmanager templates to format alert messages, improving clarity and presentation. Alertmanager uses Go’s text/template engine with additional helper functions. Alerting rules referenc...

Deploying a Maven Web Application to Tomcat 9 Using the Tomcat Manager

Tomcat 9 does not provide a dedicated Maven plugin. The Tomcat Manager interface, however, is backward-compatible, so the Tomcat 7 Maven Plugin can be used to deploy to Tomcat 9. This guide shows two...

Skipping Errors in MySQL Asynchronous Replication

When a replica halts because the SQL thread encounters an error, you can resume replication by skipping the problematic event(s). Two common approaches are available. Methods to Skip Errors 1) Skip a...

Leave a Comment

Anonymous

◎Feel free to join the discussion and share your thoughts.