Fading Coder

One Final Commit for the Last Sprint

Home > Tech > Content

Understanding Apache Flink's Semi and Anti Join Implementation

Tech May 19 2

Introduction to Semi and Anti Joins in Apache Flink

Apache Flink provides robust support for semi joins and anti joins in its query processing framework. These specialized join operations are essential for implementing SQL features like IN/NOT IN subqueries and EXISTS/NOT EXISTS predicates. Flink's implementation goes beyond basic equality-based joins, supporting more complex conditions including non-equality predicates.

Implementation Architecture

Flink's architecture for semi and anti joins is built on custom operators such as SemiHashJoinOperator, AntiHashJoinOperator, and NestedLoopJoinCodeGenerator. These operators enable Flink to handle a wide range of subquery scenarios, including:

  • Converting standard IN/NOT IN and EXISTS/NOT EXISTS predicates to semi/anti joins
  • Transforming correlated IN/NOT IN subqueries into semi/anti join operations
  • Handling multiple join conditions in IN OR EXISTS correlated subqueries

Conversion Conditions and Limitations

Flink imposes strict conditions for converting filter subqueries to semi/anti joins. The conversion only occurs when all conditions are in conjunctive normal form (predicates connected by AND). This restriction exists for two primary reasons:

  1. When extracting filter conditions from correlated subquereis, conjunctive normal form predicates can be directly moved to the outer semi join's join condition without changing semantics.
  2. Semi/anti joins represent the presence or absence of matching records in the right table. When combined with OR conditions, the overall expression would require either condition to be true, which doesn't align with semi/anti join semantics.

SQL to Logical Plan Transformation

Consider a SQL query with an IN subquery:

SELECT * FROM l WHERE a IN (SELECT d FROM r WHERE l.b > r.e)

This query is transformed into the following logical plan:

LogicalJoin(condition=[AND(=($0, $3), >($1, $4))], joinType=[semi])
:- LogicalTableScan(table=[[default_catalog, default_database, l, source: [TestTableSource(a, b, c)]])
+- LogicalProject(inputs=[0..1])
   +- LogicalFilter(condition=[true])
      +- LogicalTableScan(table=[[default_catalog, default_database, r, source: [TestTableSource(d, e, f)]])

Similarly, an EXISTS subquery:

SELECT a FROM l u where exists (select * from r where r.e = u.b)

Transforms to:

LogicalJoin(condition=[=($3, $1)], joinType=[semi])
:- LogicalTableScan(table=[[default_catalog, default_database, l, source: [TestTableSource(a, b, c)]])
+- LogicalProject(exprs=[[$1]])
   +- LogicalFilter(condition=[true])
      +- LogicalTableScan(table=[[default_catalog, default_database, r, source: [TestTableSource(d, e, f)]])

Optimization Rules

Flink employs a collection of optimization rules to attempt subquery conversion to semi/anti joins. When conversion isn't possible, Flink falls back to Calcite's subquery elimination and decorrelation mechanisms (SubQueryRemoveRule and RelDecorrelator.decorrelateQuery).

The core transformation logic resides in the FlinkSubQueryRemoveRule.FILTER optimization rule. This rule:

  1. Identifies Filter RelNodes containing subqueries
  2. Attempts to convert these subqueries to semi/anti joins
  3. Preserves the logical nature of the transformed nodes

Physical Operator Selection

During the physical planning phase, Flink selects appropriate join operators based on the logical plan. For batch processing, the BatchPhysicalHashJoin operator is commonly used. Its translateToExecNode method ultimately calls HashJoinOperator.newHashJoinOperator to create the specific join operator based on the join type.

Comparison with Other Systems

Flink's implementation differs from other systems in several ways:

  • Dremio-oss: Lacks dedicated semi/anti join optimizations and relies solely on Calcite's subquery elimination and decorrelator.
  • Presto: Primarily supports converting non-correlated IN subqueries to semi joins. The resulting semi join produces only a marker indicating whether left-side data appears in the right side, requiring additional filter and project operations.
  • Calcite: Currently lacks anti join conversion rules but can use SemiJoinRule to convert qualifying inner and left joins to semi joins.

NULL Handling in Anti Joins

When converting NOT EXISTS or NOT IN subqueries to anti joins, special attention must be given to NULL-aware equality conditions. Anti joins must corrrectly handle NULL values to maintain semantic correctness.

Conclusion

Semi and anti joins represent specialized join operations that enable efficient implementation of common SQL patterns. Flink's implementation provides comprehensive support with custom operators that handle a broader range of scenarios than many other systems. The conversion process involves multiple optimization rules, with fallback mechanisms ensuring robust query processing even when semi/anti join conversion isnt possible.

Related Articles

Implement Image Upload Functionality for Django Integrated TinyMCE Editor

Django’s Admin panel is highly user-friendly, and pairing it with TinyMCE, an effective rich text editor, simplifies content management significantly. Combining the two is particular useful for bloggi...

SBUS Signal Analysis and Communication Implementation Using STM32 with Fus Remote Controller

Overview In a recent project, I utilized the SBUS protocol with the Fus remote controller to control a vehicle's basic operations, including movement, lights, and mode switching. This article is aimed...

Comprehensive Guide to Hive SQL Syntax and Operations

This article provides a detailed walkthrough of Hive SQL, categorizing its features and syntax for practical use. Hive SQL is segmented into the following categories: DDL Statements: Operations on...

Leave a Comment

Anonymous

◎Feel free to join the discussion and share your thoughts.