Flink Task Execution Pipeline: From Transformations to JobGraph
Job Transformation Pipeline
When a user submits a Flink job, the system collects operators through a chain of method calls: transform() → doTransform() → addOperator(). This process accumulates operators like map, flatMap, filter, and process into a List<Transformation<?>> collection.
Upon executing the job via execute(), the StreamGraphGenerator.generate() method constructs the stream topology called StreamGraph (essentially a Pipeline), which follows a directed acyclic graph (DAG) structure.
Within StreamGraph, StreamNode entities store operator metadata, while StreamEdge entities define data exchange patterns between operators.
Distribution Patterns
Flink utilizes Partitioner classes that extend StreamPartitioner to determine data routing behavior through the isPointwise() method. These partitioners fall into two categories:
/**
* Defines how producer subtasks connect to consumer subtasks.
*
* <p>This pattern influences connections between {@link ExecutionVertex}
* and {@link IntermediateResultPartition} in {@link EdgeManagerBuildUtil}
*/
public enum DistributionPattern {
/** Every producer subtask connects to all consumer subtasks. */
ALL_TO_ALL,
/** Every producer subtask connects to one or more consumer subtasks. */
POINTWISE
}
ALL_TO_ALL requires each upstream subtask to establish connections with every downstream subtask.
POINTWISE allows each upstream subtask to connect to one or more downstream subtasks.
Building JobGraph
Once StreamGraph construction completes, the system invokes PipelineExecutorUtils.getJobGraph() to generate the JobGraph. The execution path follows this sequence:
——>PipelineExecutorUtils.getJobGraph()
——>FlinkPipelineTranslationUtil.getJobGraph()
——>StreamGraphTranslator.translateToJobGraph()
——>StreamGraph.getJobGraph()
——>StreamingJobGraphGenerator.createJobGraph()
JobGraph represents an optimized version of StreamGraph. When consecutive operators satisfy chaining criteria, they get merged into a single StreamNode. This optimization is implemented within the setChaining() method of StreamingJobGraphGenerator:
/**
* Constructs task chains starting from source {@link StreamNode} instances.
*
* <p>This method recursively generates all {@link JobVertex} instances.
*/
private void setChaining(Map<Integer, byte[]> hashes, List<Map<Integer, byte[]>> legacyHashes) {
// differentiate between sources functioning as chained inputs versus standalone head operators
final Map<Integer, OperatorChainInfo> chainEntryPoints =
buildChainedInputsAndGetHeadInputs(hashes, legacyHashes);
final Collection<OperatorChainInfo> initialEntryPoints =
chainEntryPoints.entrySet().stream()
.sorted(Comparator.comparing(Map.Entry::getKey))
.map(Map.Entry::getValue)
.collect(Collectors.toList());
// iterate over a copy since the map undergoes concurrent modifications
for (OperatorChainInfo info : initialEntryPoints) {
createChain(
info.getStartNodeId(),
1, // position 1 reserved for chained source inputs, 0 is for the operator itself
info,
chainEntryPoints);
}
}
Chaining Requirements
Operators qualify for chaining when they meet all of the following conditions:
1. Downstream node has exactly one input edge
2. Belongs to the same SlotSharingGroup as downstream operator
3. Data distribution strategy is Forward
4. Stream data exchange mode is not batch mode
5. Upstream and downstream have equal parallelism
6. Chaining is enabled in StreamGraph configuration
7. Operators satisfy the chainability check via areOperatorsChainable()
These conditions ensure that only logically compatible operators get fused together, reducing network overhead and improving task execution efficiency.