: "Kubernetes Scheduler Predicate Filtering: A Deep Dive into Node Selection Logic"
Predicate Filtering Entry Point
The node filtering phase begins at pkg/scheduler/core/generic_scheduler.go:389 within the findNodesThatFit() method. This function evaluates all cluster nodes against a set of predicate rules to identify viable placement targets for a pod. The core implementation follows this pattern:
func (sched *standardScheduler) filterValidNodes(targetPod *v1.Pod, clusterNodes []*v1.Node) ([]*v1.Node, FailedPredicateMap, error) {
var viableNodeCount int32
viableNodes := make([]*v1.Node, len(clusterNodes))
evaluateNode := func(nodeIndex int) {
compatible, failureReasons, evaluationError := nodeMeetsPredicates(
targetPod,
meta,
sched.nodeInfoCache[clusterNodes[nodeIndex].Name],
sched.predicates,
nodeCache,
sched.schedulingQueue,
sched.exhaustiveChecks,
equivClass,
)
if compatible && evaluationError == nil {
position := atomic.AddInt32(&viableNodeCount, 1)
viableNodes[position-1] = sched.nodeInfoCache[clusterNodes[nodeIndex].Name].Node()
}
}
workqueue.ParallelizeUntil(context.Background(), 16, len(clusterNodes), evaluateNode)
if len(viableNodes) > 0 && len(sched.extenders) > 0 {
for _, extender := range sched.extenders {
// Extender integration logic (covered separately)
}
}
return viableNodes, failureReasonsByNode, nil
}
The method delegates individual node evaluation to nodeMeetsPredicates() while orchestrating concurrent execution across the node fleet.
Concurrent Node Evaluation Mechanism
The evaluateNode function executes within a parallel worker pool managed by ParallelizeUntil:
workqueue.ParallelizeUntil(context.Background(), 16, len(clusterNodes), evaluateNode)
This utility function implements a fixed-size worker pattern:
func ParallelizeUntil(ctx context.Context, workerCount, taskCount int, executeTask TaskExecutor) {
stopChannel := ctx.Done()
taskQueue := make(chan int, taskCount)
for taskID := 0; taskID < taskCount; taskID++ {
taskQueue <- taskID
}
close(taskQueue)
activeWorkers := workerCount
if taskCount < workerCount {
activeWorkers = taskCount
}
var completionGroup sync.WaitGroup
completionGroup.Add(activeWorkers)
for workerID := 0; workerID < activeWorkers; workerID++ {
go func() {
defer runtime.HandleCrash()
defer completionGroup.Done()
for currentTask := range taskQueue {
select {
case <-stopChannel:
return
default:
executeTask(currentTask)
}
}
}()
}
completionGroup.Wait()
}
The implementation creates a buffered channel populated with task indices, then spawns a configurable number of goroutines (capped at 16) that consume tasks until the channel empties. This pattern ensures efficient CPU utilization while preventing excessive concurrency.
Single Node Compatibility Assessment
The nodeMeetsPredicates() function performs the actual predicate evaluation for an individual node:
func evaluateNodeCompatibility(
targetPod *v1.Pod,
metadata algorithm.PredicateMetadata,
nodeDetails *schedulercache.NodeInfo,
predicateRegistry map[string]algorithm.FitPredicate,
cache *equivalence.NodeCache,
schedulingQueue internalqueue.SchedulingQueue,
exhaustiveCheck bool,
equivalenceClass *equivalence.Class,
) (bool, []algorithm.PredicateFailureReason, error) {
nominatedPodsInjected := false
for attempt := 0; attempt < 2; attempt++ {
metaForAttempt := metadata
nodeForAttempt := nodeDetails
if attempt == 0 {
nominatedPodsInjected, metaForAttempt, nodeForAttempt = incorporateNominatedPods(targetPod, metadata, nodeDetails, schedulingQueue)
} else if !nominatedPodsInjected || len(failures) > 0 {
break
}
cacheUsable := equivalenceClass != nil && cache != nil && !nominatedPodsInjected
for _, predicateName := range predicates.Ordering() {
var (
satisfied bool
failures []algorithm.PredicateFailureReason
evalError error
)
if predicateFunc, found := predicateRegistry[predicateName]; found {
if cacheUsable {
satisfied, failures, evalError = cache.RunPredicate(predicateFunc, predicateName, predicateIndex, targetPod, metaForAttempt, nodeForAttempt, equivalenceClass)
} else {
satisfied, failures, evalError = predicateFunc(targetPod, metaForAttempt, nodeForAttempt)
}
if evalError != nil {
return false, []algorithm.PredicateFailureReason{}, evalError
}
if !satisfied {
// Failure handling logic
}
}
}
}
return len(failures) == 0, failures, nil
}
The dual-iteration approach handles nominated pods: first assuming they will bind to the node, then assuming they won't. This conservative strategy ensures accurate results for predicates sensitive to pod placement, such as inter-pod affinity rules.
Predicate Evaluation Sequence
Predicates execute in a predetermined order defined by:
var predicatesOrdering = []string{
ValidateNodeCondition,
CheckNodeSchedulable,
GeneralPredicate,
HostnameMatch,
NodePortAvailability,
NodeSelectorCompliance,
ResourceSufficiency,
StorageCompatibility,
TaintTolerance,
NoExecuteTaintTolerance,
LabelPresenceCheck,
ServiceAffinity,
MaxEBSVolumeCount,
MaxGCEPDVolumeCount,
MaxCSIVolumeCount,
MaxAzureDiskVolumeCount,
VolumeBindingCheck,
VolumeZoneConflict,
MemoryPressureCheck,
PIDPressureCheck,
DiskPressureCheck,
InterPodAffinity,
}
This ordering optimizes early rejection of unfit nodes. The rationale behind key positions:
| Position | Predicate | Justification |
|---|---|---|
| 1 | Node Condition Validation | Eliminate nodes in problematic states immediately |
| 2 | Hostname Matching | Fast exact-match check for pod.spec.nodeName |
| 3 | Port Availability | Validate requested host ports early |
| 4 | Node Selector | Filter by node labels after basic checks |
| 5 | Resource Sufficiency | Range-based validation, less restrictive than exact matches |
| 6 | Storage Compatibility | Check volume conflicst post-resource validation |
| 7 | Taint Tolerance | Evaluate node taints against pod tolerations |
| 14-15 | Pressure Checks | Infrequent conditions placed later in sequence |
| 16 | Inter-Pod Affinity | Most computationally expensive predicate last |
This sequence can be overridden through scheduling policy configuration:
{
"kind": "SchedulingPolicy",
"apiVersion": "v1",
"predicates": [
{"name": "NodePortAvailability", "order": 2},
{"name": "ResourceSufficiency", "order": 3},
{"name": "StorageCompatibility", "order": 5},
{"name": "TaintTolerance", "order": 4},
{"name": "NodeSelectorCompliance", "order": 6},
{"name": "HostnameMatch", "order": 1}
],
"priorities": [
{"name": "ResourceOptimization", "weight": 1},
{"name": "BalancedDistribution", "weight": 1},
{"name": "ServiceSpreading", "weight": 1},
{"name": "UniformPriority", "weight": 1}
],
"affinityWeight": 10
}
Individual Predicate Execution
Each predicate conforms to the FitPredicate signature:
type FitPredicate func(pod *v1.Pod, meta PredicateMetadata, nodeInfo *schedulercache.NodeInfo) (bool, []PredicateFailureReason, error)
The execution loop retrieves predicates by name from the registry and invokes them directly:
if predicateFunc, found := predicateRegistry[predicateName]; found {
if cacheUsable {
satisfied, failures, evalError = cache.RunPredicate(predicateFunc, predicateName, predicateIndex, targetPod, metaForAttempt, nodeForAttempt, equivalenceClass)
} else {
satisfied, failures, evalError = predicateFunc(targetPod, metaForAttempt, nodeForAttempt)
}
}
Concrete Predicate Implementation
Consider the storage conflict prevention predicate:
func PreventStorageConflicts(candidatePod *v1.Pod, metadata algorithm.PredicateMetadata, nodeSnapshot *schedulercache.NodeInfo) (bool, []algorithm.PredicateFailureReason, error) {
podVolumes := candidatePod.Spec.Volumes
existingPods := nodeSnapshot.Pods()
for _, volume := range podVolumes {
for _, runningPod := range existingPods {
if volumesIntersect(volume, runningPod) {
return false, []algorithm.PredicateFailureReason{ErrStorageConflict}, nil
}
}
}
return true, nil, nil
}
This predicate iterates through the candidate pod's volume list and cross-references against all pods currently assigned to the node, detecting any storage conflicts. The pattern demonstrates the typical structure: inspect pod specifications, compare against node state, and return a boolean result with optional failure details.