Fading Coder

One Final Commit for the Last Sprint

Home > Tech > Content

Flink Task Execution Pipeline: From Transformations to JobGraph

Tech May 9 3

Job Transformation Pipeline

When a user submits a Flink job, the system collects operators through a chain of method calls: transform()doTransform()addOperator(). This process accumulates operators like map, flatMap, filter, and process into a List<Transformation<?>> collection.

Upon executing the job via execute(), the StreamGraphGenerator.generate() method constructs the stream topology called StreamGraph (essentially a Pipeline), which follows a directed acyclic graph (DAG) structure.

Within StreamGraph, StreamNode entities store operator metadata, while StreamEdge entities define data exchange patterns between operators.

Distribution Patterns

Flink utilizes Partitioner classes that extend StreamPartitioner to determine data routing behavior through the isPointwise() method. These partitioners fall into two categories:

/**
 * Defines how producer subtasks connect to consumer subtasks.
 *
 * <p>This pattern influences connections between {@link ExecutionVertex}
 * and {@link IntermediateResultPartition} in {@link EdgeManagerBuildUtil}
 */
public enum DistributionPattern {

    /** Every producer subtask connects to all consumer subtasks. */
    ALL_TO_ALL,

    /** Every producer subtask connects to one or more consumer subtasks. */
    POINTWISE
}

ALL_TO_ALL requires each upstream subtask to establish connections with every downstream subtask.

POINTWISE allows each upstream subtask to connect to one or more downstream subtasks.

Building JobGraph

Once StreamGraph construction completes, the system invokes PipelineExecutorUtils.getJobGraph() to generate the JobGraph. The execution path follows this sequence:

——>PipelineExecutorUtils.getJobGraph()
——>FlinkPipelineTranslationUtil.getJobGraph()
——>StreamGraphTranslator.translateToJobGraph()
——>StreamGraph.getJobGraph()  
——>StreamingJobGraphGenerator.createJobGraph()

JobGraph represents an optimized version of StreamGraph. When consecutive operators satisfy chaining criteria, they get merged into a single StreamNode. This optimization is implemented within the setChaining() method of StreamingJobGraphGenerator:

/**
 * Constructs task chains starting from source {@link StreamNode} instances.
 *
 * <p>This method recursively generates all {@link JobVertex} instances.
 */
private void setChaining(Map<Integer, byte[]> hashes, List<Map<Integer, byte[]>> legacyHashes) {
    // differentiate between sources functioning as chained inputs versus standalone head operators
    final Map<Integer, OperatorChainInfo> chainEntryPoints =
            buildChainedInputsAndGetHeadInputs(hashes, legacyHashes);
    final Collection<OperatorChainInfo> initialEntryPoints =
            chainEntryPoints.entrySet().stream()
                    .sorted(Comparator.comparing(Map.Entry::getKey))
                    .map(Map.Entry::getValue)
                    .collect(Collectors.toList());

    // iterate over a copy since the map undergoes concurrent modifications
    for (OperatorChainInfo info : initialEntryPoints) {
        createChain(
                info.getStartNodeId(),
                1, // position 1 reserved for chained source inputs, 0 is for the operator itself
                info,
                chainEntryPoints);
    }
}

Chaining Requirements

Operators qualify for chaining when they meet all of the following conditions:

1. Downstream node has exactly one input edge
2. Belongs to the same SlotSharingGroup as downstream operator
3. Data distribution strategy is Forward
4. Stream data exchange mode is not batch mode
5. Upstream and downstream have equal parallelism
6. Chaining is enabled in StreamGraph configuration
7. Operators satisfy the chainability check via areOperatorsChainable()

These conditions ensure that only logically compatible operators get fused together, reducing network overhead and improving task execution efficiency.

Related Articles

Understanding Strong and Weak References in Java

Strong References Strong reference are the most prevalent type of object referencing in Java. When an object has a strong reference pointing to it, the garbage collector will not reclaim its memory. F...

Comprehensive Guide to SSTI Explained with Payload Bypass Techniques

Introduction Server-Side Template Injection (SSTI) is a vulnerability in web applications where user input is improper handled within the template engine and executed on the server. This exploit can r...

SBUS Signal Analysis and Communication Implementation Using STM32 with Fus Remote Controller

Overview In a recent project, I utilized the SBUS protocol with the Fus remote controller to control a vehicle's basic operations, including movement, lights, and mode switching. This article is aimed...

Leave a Comment

Anonymous

◎Feel free to join the discussion and share your thoughts.