Understanding Apache Flink's Semi and Anti Join Implementation
Introduction to Semi and Anti Joins in Apache Flink
Apache Flink provides robust support for semi joins and anti joins in its query processing framework. These specialized join operations are essential for implementing SQL features like IN/NOT IN subqueries and EXISTS/NOT EXISTS predicates. Flink's implementation goes beyond basic equality-based joins, supporting more complex conditions including non-equality predicates.
Implementation Architecture
Flink's architecture for semi and anti joins is built on custom operators such as SemiHashJoinOperator, AntiHashJoinOperator, and NestedLoopJoinCodeGenerator. These operators enable Flink to handle a wide range of subquery scenarios, including:
- Converting standard IN/NOT IN and EXISTS/NOT EXISTS predicates to semi/anti joins
- Transforming correlated IN/NOT IN subqueries into semi/anti join operations
- Handling multiple join conditions in IN OR EXISTS correlated subqueries
Conversion Conditions and Limitations
Flink imposes strict conditions for converting filter subqueries to semi/anti joins. The conversion only occurs when all conditions are in conjunctive normal form (predicates connected by AND). This restriction exists for two primary reasons:
- When extracting filter conditions from correlated subquereis, conjunctive normal form predicates can be directly moved to the outer semi join's join condition without changing semantics.
- Semi/anti joins represent the presence or absence of matching records in the right table. When combined with OR conditions, the overall expression would require either condition to be true, which doesn't align with semi/anti join semantics.
SQL to Logical Plan Transformation
Consider a SQL query with an IN subquery:
SELECT * FROM l WHERE a IN (SELECT d FROM r WHERE l.b > r.e)
This query is transformed into the following logical plan:
LogicalJoin(condition=[AND(=($0, $3), >($1, $4))], joinType=[semi])
:- LogicalTableScan(table=[[default_catalog, default_database, l, source: [TestTableSource(a, b, c)]])
+- LogicalProject(inputs=[0..1])
+- LogicalFilter(condition=[true])
+- LogicalTableScan(table=[[default_catalog, default_database, r, source: [TestTableSource(d, e, f)]])
Similarly, an EXISTS subquery:
SELECT a FROM l u where exists (select * from r where r.e = u.b)
Transforms to:
LogicalJoin(condition=[=($3, $1)], joinType=[semi])
:- LogicalTableScan(table=[[default_catalog, default_database, l, source: [TestTableSource(a, b, c)]])
+- LogicalProject(exprs=[[$1]])
+- LogicalFilter(condition=[true])
+- LogicalTableScan(table=[[default_catalog, default_database, r, source: [TestTableSource(d, e, f)]])
Optimization Rules
Flink employs a collection of optimization rules to attempt subquery conversion to semi/anti joins. When conversion isn't possible, Flink falls back to Calcite's subquery elimination and decorrelation mechanisms (SubQueryRemoveRule and RelDecorrelator.decorrelateQuery).
The core transformation logic resides in the FlinkSubQueryRemoveRule.FILTER optimization rule. This rule:
- Identifies Filter RelNodes containing subqueries
- Attempts to convert these subqueries to semi/anti joins
- Preserves the logical nature of the transformed nodes
Physical Operator Selection
During the physical planning phase, Flink selects appropriate join operators based on the logical plan. For batch processing, the BatchPhysicalHashJoin operator is commonly used. Its translateToExecNode method ultimately calls HashJoinOperator.newHashJoinOperator to create the specific join operator based on the join type.
Comparison with Other Systems
Flink's implementation differs from other systems in several ways:
- Dremio-oss: Lacks dedicated semi/anti join optimizations and relies solely on Calcite's subquery elimination and decorrelator.
- Presto: Primarily supports converting non-correlated IN subqueries to semi joins. The resulting semi join produces only a marker indicating whether left-side data appears in the right side, requiring additional filter and project operations.
- Calcite: Currently lacks anti join conversion rules but can use SemiJoinRule to convert qualifying inner and left joins to semi joins.
NULL Handling in Anti Joins
When converting NOT EXISTS or NOT IN subqueries to anti joins, special attention must be given to NULL-aware equality conditions. Anti joins must corrrectly handle NULL values to maintain semantic correctness.
Conclusion
Semi and anti joins represent specialized join operations that enable efficient implementation of common SQL patterns. Flink's implementation provides comprehensive support with custom operators that handle a broader range of scenarios than many other systems. The conversion process involves multiple optimization rules, with fallback mechanisms ensuring robust query processing even when semi/anti join conversion isnt possible.