Deep Dive into Kubernetes client-go Informer Mechanism
Controller Architectuer
Controller serves as the core component for initializing and managing the Informer lifecycle. Its structure is defined as follows:
type controller struct {
config Config
reflector *Reflector
reflectorMutex sync.RWMutex
clock clock.Clock
}
The controller interface exposes essential methods:
type Controller interface {
Run(stopCh <-chan struct{})
HasSynced() bool
LastSyncResourceVersion() string
}
The Run() method handles two primary tasks:
- Initializes the Reflector to synchronize object events into DeltaFIFO
- Processes objects popped from DeltaFIFO using the configured ProcessFunc
Initialization
Controller initialization is straightforward:
func New(c *Config) Controller {
return &controller{
config: *c,
clock: &clock.RealClock{},
}
}
In practice, SharedIndexInformer initializes the controller during its Run() method:
func (s *sharedIndexInformer) Run(stopCh <-chan struct{}) {
fifo := NewDeltaFIFOWithOptions(DeltaFIFOOptions{ /*...*/ })
cfg := &Config{
Queue: fifo,
ListerWatcher: s.listerWatcher,
ObjectType: s.objectType,
FullResyncPeriod: s.resyncCheckPeriod,
Process: s.HandleDeltas,
// ...
}
s.controller = New(cfg)
s.controller.Run(stopCh)
}
Execution Flow
When started, the controller performs:
func (c *controller) Run(stopCh <-chan struct{}) {
r := NewReflector( // Initialize Reflector with config
c.config.ListerWatcher,
c.config.ObjectType,
c.config.Queue,
c.config.FullResyncPeriod,
)
// Configure reflector properties
go r.Run(stopCh) // Start reflector
wait.Until(c.processLoop, time.Second, stopCh)
}
The processLoop() continuously pops objects from DeltaFIFO:
func (c *controller) processLoop() {
for {
obj, err := c.config.Queue.Pop(PopProcessFunc(c.config.Process))
// Handle errors and requeue if necessary
}
}
Event Processing via HandleDeltas
SharedIndexInformer's HandleDeltas() method processes Delta objects:
func (s *sharedIndexInformer) HandleDeltas(obj interface{}) error {
deltas := obj.(Deltas)
for _, delta := range deltas {
switch delta.Type {
case Sync, Replaced, Added, Updated:
if existing, exists, err := s.indexer.Get(delta.Object); err == nil && exists {
s.indexer.Update(delta.Object) // Update cache
s.processor.distribute(updateNotification{ /*...*/ }, false)
} else {
s.indexer.Add(delta.Object) // Add to cache
s.processor.distribute(addNotification{ /*...*/ }, false)
}
case Deleted:
s.indexer.Delete(delta.Object) // Remove from cache
s.processor.distribute(deleteNotification{ /*...*/ }, false)
}
}
return nil
}
SharedIndexInformer Architecture
SharedIndexInformer implements enhanced event distribution:
type sharedIndexInformer struct {
indexer Indexer
controller Controller
processor *sharedProcessor
// ...
}
Processor Mechanism
sharedProcessor manages multiple processorListener instances:
type sharedProcessor struct {
listeners []*processorListener
syncingListeners []*processorListener
// ...
}
Each processorListener routes notifications to its ResourceEventHandler:
type processorListener struct {
nextCh chan interface{}
addCh chan interface{}
handler ResourceEventHandler
// ...
}
Key methods:
run(): Dispatches events to handler methodsadd(): Enqueues incoming notificationspop(): Moves notifications from buffer to disptach channel
SharedIndexInformer Execution
The Run() method coordinates components:
func (s *sharedIndexInformer) Run(stopCh <-chan struct{}) {
fifo := NewDeltaFIFOWithOptions( /*...*/ )
cfg := &Config{ /*...*/ }
s.controller = New(cfg)
processorStopCh := make(chan struct{})
go s.processor.run(processorStopCh) // Start processor
s.controller.Run(stopCh) // Start controller
}
SharedInformerFactory Implemantation
Provides grouped informers for API resources:
type sharedInformerFactory struct {
client kubernetes.Interface
informers map[reflect.Type]cache.SharedIndexInformer
// ...
}
Initialization:
func NewSharedInformerFactory(client kubernetes.Interface, resync time.Duration) SharedInformerFactory {
return &sharedInformerFactory{
client: client,
defaultResync: resync,
informers: make(map[reflect.Type]cache.SharedIndexInformer),
}
}
Starting all informers:
func (f *sharedInformerFactory) Start(stopCh <-chan struct{}) {
for informerType, informer := range f.informers {
go informer.Run(stopCh) // Start each informer
}
}
Resource-specific informers (e.g., for Deployments) are accessed via:
func (f *sharedInformerFactory) Apps() apps.Interface {
return appsinformer.New(f, f.namespace, f.tweakListOptions)
}
This hierarchical structure allows retrieving specialized informers for any Kubernetes resource.