Understanding Flink SQL for Dynamic Stream Processing
Flink SQL Overview
Table API and SQL represent the highest-level APIs within Flink. These two APIs are tightly integrated, with SQL operations being executed against Flink's Table abstraction. Consequently, they are often considered a unified layer. Flink provides a unified batch and stream processing framework; both batch jobs (using DataSet API) and streaming jobs (using DataStream API) can be implemented directly using Table API or SQL at the application level. For a given table, executing an identical query via either API yields exactly the same result.
It's important to note that Table API and SQL were initially underdeveloped. Significant changes occurred following the integration of Alibaba's internal Blink version in Flink 1.9. Development and refinement have continued rapidly, with core functionality largely solidified by Flink 1.12. However, even in the latest 1.17 release, the Table API and SQL remain in an evolutionary state, with interfaces and usage patterns subject to ongoing adjustments. Therefore, the focus should be on understanding core principles and fundamental usage, while specific API calls should be verified against the official documentation for current best practices.
The SQL API is built atop the Apache Calcite framework, adhering to SQL standards. This allows for developing and running Flink jobs using pure SQL.
Tables in Stream Processing
A comparison can be made between relational tables/SQL, primarily designed for batch processing, and the requirements of stream processing, revealing inherent conceptual differences. This section delves into the concept of tables within a streaming context.
Stream processing deals with continuous, unbounded data, wich fundamentally alters the nature of "tables" compared to static relational database tables. Consequently, query operations executed on such tables carry new semantics.
Dynamic Tables
Dynamic Tables are the central abstraction in Flink's Table API and SQL for enabling relational semantics over streaming data. Traditional tables used in batch processing are static, representing a fixed data set. In contrast, a Dynamic Table is mutable; its contents evolve over time as new data arrives.
Continuous Queries
Dynamic Tables can be queried similarly to static batch tables. However, because the underlying data is continuously changing, an SQL query defined on a dynamic table cannot execute once and produce a final result. Instead, such a query runs perpetually, updating its results as new input data becomes available. This type of never-ending query is termed a Continuous Query. Any query defined on a dynamic table is a continuous query, and its result is itself another dynamic table.
Conceptually, each incoming data element triggers a query evaluation. The query at any moment operates on a "snapshot" of the input dynamic table, representing all data received up to that point, akin to a finite batch. The arrival of streaming data triggers a continuous sequence of these snapshot queries, creating an animated, evolving result.
The workflow for continuous querying involves three key steps:
- A data stream is converted into a dynamic table.
- A continuous query runs on this dynamic table, producing a new resultant dynamic table.
- The resulting dynamic table is converted back into a data stream.
By encapsulating the conversions between streams and dynamic tables, the API allows SQL queries to be executed directly on data streams, effectively applying table-oriented processing to streaming data.
Converting Streams to Dynamic Tables
When viewing a stream as a table, each arriving data record corresponds to an Insert operation, appending a new row to the table. Since a stream is continuous and past outputs cannot be modified (only appended to), the table is effectively built from an insert-only changelog stream.
Update Queries
Table userClickCount = tableEnv
.sqlQuery("SELECT userId, COUNT(pageUrl) AS visitCount FROM ClickStream GROUP BY userId");
As new records are continuously inserted into the source dynamic table (ClickStream), the query result userClickCount undergoes ongoing modifications. Since aggregated counts can increase, these modifications may be new insertions or updates to previously emitted rows. This type of continuous query is an Update Query. To convert a result table from an update query into a DataStream, the toChangelogStream() method must be invoked.
Append Queries
If the query involves grouping and aggregation, the result table will include update operations. However, a simple filter query, like the example below, results in a table that, like the source table, only has insert operations.
Table aliceClicks = tableEnv
.sqlQuery("SELECT pageUrl, userId FROM ClickStream WHERE userId = 'Alice'");
Such a continuous query is termed an Append Query. The changelog stream for its result table contains only INSERT operations. When a query involves tumbling or hopping windows where results are computed and written in bulk per window, the result table's changelog also typically contains only INSERT operations (for non-overlapping windows). For an append query result, conversion to DataStream can be done using the toDataStream() method.
Converting Dynamic Tables to Streams
Similar to tables in relational databases, dynamic tables undergo continuous modifications via Insert, Update, and Delete operations. Converting a dynamic table to a stream or writing it to an external system requires encoding these change operations into a stream of messages that instruct the downstream system what to do. Flink's Table API and SQL support three encoding modes:
- Append-only Stream: A dynamic table modified solely through insert operations can be directly converted to an append-only stream. Each emitted element corresponds to a newly added row in the table.
- Retract Stream: A retract stream contains two types of messages: Add messages and Retract messages. The encoding rules are: INSERT operations become Add messages. DELETE operations become Retract messages. UPDATE operations are encoded as a Retract message for the old row and an Add message for the new row. This encoding allows a dynamic table to be converted into a retract stream representing all changes.
- Upsert Stream: An upsert stream contains Upsert messages and Delete messages. "Upsert" combines "update" and "insert". In this encoding, both INSERT and UPDATE operations are encoded as Upsert messages (where a key determines if it's an insert or update), while DELETE operations are encoded as Delete messages.
When converting a dynamic table to a DataStream within Flink code, only append-only and retract stream encodings are supported. The toChangelogStream() method yields a retract stream. When connecting to external systems, the supported encoding depends on the characteristics of the specific sink.
Time Attributes in Flink SQL
Time-based operations, such as time windows, require a clear definition of time semantics and the source of timing information. In the Table API and SQL, tables can be augmented with a logical time attribute field specifically used to indicate time within table processing.
A time attribute is thus an integral part of a table's schema. It can be defined directly as a field in the DDL when creating a table or specified when converting a DataStream to a table. Once defined, the time attribute can be referenced like a regular column and used in time-based operations.
Event Time
Event time is the time embedded within the data records themselves, typically representing when an event occurred at its source. Using event time allows for correct handling of out-of-order events. To define an event time attribute, a watermarking strategy must be declared to specify how to derive event timestamps and handle lateness.
Processing Time
Processing time refers to the system time of the machine executing the operation (the Flink task manager). It is the simplest time notion, requiring no timestamp extraction or watermark generation. Defining a processing time attribute typically involves declaring a virtual column that is populated by the system's current time during processing.