Understanding Group and Over Aggregations in Flink SQL
Understanding Group and Over Aggregations in Flink SQL
- Group Aggregation
1.1 Basic Concepts
Group aggregation in Flink (supported for both Batch and Streaming tasks) is a method of grouping data based on specific categories such as age, gender, or other dimensions. This differs from window aggregation, which groups data based on time intervals. While window aggregation operates vertically over time, group aggregation operates horizontally across data categories.
1.2 Window Aggregation vs. Group Aggregation
Application scenarios: Group aggregation is typically used to categorize data before applying aggregate functions like COUNT, SUM, etc.
One might wonder how window aggregation can be converted to group aggregation by simply using time as the GROUP BY key. What are the essential differences between these two approaches?
Consider this example of converting a window aggregation to group aggregation. For instance, a 1-minute window aggregation can be transformed as follows:
Window Aggregation Example (Rolling Window):
-- Source table
CREATE TABLE source_table (
dimension_data STRING,
user_id BIGINT,
price BIGINT,
event_time AS cast(CURRENT_TIMESTAMP as timestamp(3)),
WATERMARK FOR event_time AS event_time - INTERVAL '5' SECOND
) WITH (
'connector' = 'datagen',
'rows-per-second' = '10',
'fields.dimension_data.length' = '1',
'fields.user_id.min' = '1',
'fields.user_id.max' = '100000',
'fields.price.min' = '1',
'fields.price.max' = '100000'
)
-- Sink table
CREATE TABLE output_table (
dimension_data STRING,
page_views BIGINT,
total_price BIGINT,
max_price BIGINT,
min_price BIGINT,
unique_users BIGINT,
window_start bigint
) WITH (
'connector' = 'print'
)
-- Processing logic
insert into output_table
select dimension_data,
count(*) as page_views,
sum(price) as total_price,
max(price) as max_price,
min(price) as min_price,
count(distinct user_id) as unique_users,
UNIX_TIMESTAMP(CAST(tumble_start(event_time, interval '1' minute) AS STRING)) * 1000 as window_start
from source_table
group by
dimension_data,
tumble(event_time, interval '1' minute)
Converted Group Aggregation:
-- Source table
CREATE TABLE source_table (
dimension_data STRING,
user_id BIGINT,
price BIGINT,
event_time AS cast(CURRENT_TIMESTAMP as timestamp(3)),
WATERMARK FOR event_time AS event_time - INTERVAL '5' SECOND
) WITH (
'connector' = 'datagen',
'rows-per-second' = '10',
'fields.dimension_data.length' = '1',
'fields.user_id.min' = '1',
'fields.user_id.max' = '100000',
'fields.price.min' = '1',
'fields.price.max' = '100000'
);
-- Sink table
CREATE TABLE output_table (
dimension_data STRING,
page_views BIGINT,
total_price BIGINT,
max_price BIGINT,
min_price BIGINT,
unique_users BIGINT,
window_start bigint
) WITH (
'connector' = 'print'
);
-- Processing logic
insert into output_table
select dimension_data,
count(*) as page_views,
sum(price) as total_price,
max(price) as max_price,
min(price) as min_price,
count(distinct user_id) as unique_users,
cast((UNIX_TIMESTAMP(CAST(event_time AS STRING))) / 60 as bigint) as window_start
from source_table
group by
dimension_data,
cast((UNIX_TIMESTAMP(CAST(event_time AS STRING))) / 60 as bigint)
While this conversion is technically valid, the fundamental differences between window and group aggregation are:
- Essential Difference: Window aggregation has time seamntics, meaning once a window is closed and results are emitted, subsequent late data won't alter those results (excluding allowLateness). Group aggregation lacks time semantics; any incoming data, regardless of how late, will trigger a retraction of previous results and emission of updated results.
- Execution Level: Window aggregation is time-bound, with computation triggered by time (Watermark). Group aggregation is entirely data-driven, with each new record triggering computation and result emission.
1.3 SQL Semantics
In a streaming context with a Kafka source and target, the SQL generates three operators:
- Source Operator: Continuously reads data from Kafka and sends it to the group aggregation operator. The shuffle strategy is based on GROUP BY keys, with identical keys routed to the same subtask.
- Group Aggregation Operator: Receives records, checks state for previous results, computes new values, updates state, and emits results. For existing keys, it first retracts previous results before emitting new ones. For new keys, it directly emits the computed result.
- Sink Operator: Receives results and writes them to the target Kafka table. All operators remain running continuously in a streaming environment.
1.4 Advanced Grouping Features
Group aggregation supports GROUPING SETS, ROLLUP, and CUBE. Here's an example using GROUPING SETS:
SELECT
supplier_id
, rating
, product_id
, COUNT(*)
FROM (VALUES
('supplier1', 'product1', 4),
('supplier1', 'product2', 3),
('supplier2', 'product3', 3),
('supplier2', 'product4', 4))
AS Products(supplier_id, product_id, rating)
GROUP BY GROUPING SET (
( supplier_id, product_id, rating ),
( supplier_id, product_id ),
( supplier_id, rating ),
( supplier_id ),
( product_id, rating ),
( product_id ),
( rating ),
( )
)
- Over Aggregation
Over aggregation (supported for both Batch and Streaming) can be understood as a special type of sliding window aggregation function.
The key difference between Over aggregation and window aggregation is:
- Window aggregation: Fields not in GROUP BY cannot be directly seelcted.
- Over aggregation: Preserves original fields while computing aggregates.
Note: Over aggregation is used less frequently in production environments despite being available in systems like Hive.
Application Scenarios
Over aggregation is useful for calculating aggregate results over a sliding window of recent data. For example, calculating the sum of order amounts for each product over the last hour.
Practical Example
SELECT order_id, order_time, amount,
SUM(amount) OVER (
PARTITION BY product
ORDER BY order_time
RANGE BETWEEN INTERVAL '1' HOUR PRECEDING AND CURRENT ROW
) AS one_hour_product_amount_sum
FROM Orders
Over Aggregation Syntax
SELECT
agg_func(agg_col) OVER (
[PARTITION BY col1[, col2, ...]]
ORDER BY time_col
range_definition),
...
FROM ...
ORDER BY: Must be a timestamp column (event time or processing time).PARTITION BY: Defines the aggregation granularity (e.g., by product).range_definition: Specifies the aggregation window range, either by number of rows or time interval.
2.1 Time-Based Aggregation
Time-based aggregation creates a sliding time window. The following example calculates the sum of amounts over the last hour for each product:
CREATE TABLE source_table (
order_id BIGINT,
product BIGINT,
amount BIGINT,
order_time as cast(CURRENT_TIMESTAMP as TIMESTAMP(3)),
WATERMARK FOR order_time AS order_time - INTERVAL '0.001' SECOND
) WITH (
'connector' = 'datagen',
'rows-per-second' = '1',
'fields.order_id.min' = '1',
'fields.order_id.max' = '2',
'fields.amount.min' = '1',
'fields.amount.max' = '10',
'fields.product.min' = '1',
'fields.product.max' = '2'
);
CREATE TABLE sink_table (
product BIGINT,
order_time TIMESTAMP(3),
amount BIGINT,
one_hour_product_amount_sum BIGINT
) WITH (
'connector' = 'print'
);
INSERT INTO sink_table
SELECT product, order_time, amount,
SUM(amount) OVER (
PARTITION BY product
ORDER BY order_time
RANGE BETWEEN INTERVAL '1' HOUR PRECEDING AND CURRENT ROW
) AS one_hour_product_amount_sum
FROM source_table
2.2 Row-Based Aggregation
Row-based aggregation creates a sliding window of a fixed number of rows. The following example calculates the sum of amounts over the last 5 rows for each product:
CREATE TABLE source_table (
order_id BIGINT,
product BIGINT,
amount BIGINT,
order_time as cast(CURRENT_TIMESTAMP as TIMESTAMP(3)),
WATERMARK FOR order_time AS order_time - INTERVAL '0.001' SECOND
) WITH (
'connector' = 'datagen',
'rows-per-second' = '1',
'fields.order_id.min' = '1',
'fields.order_id.max' = '2',
'fields.amount.min' = '1',
'fields.amount.max' = '2',
'fields.product.min' = '1',
'fields.product.max' = '2'
);
CREATE TABLE sink_table (
product BIGINT,
order_time TIMESTAMP(3),
amount BIGINT,
five_row_product_amount_sum BIGINT
) WITH (
'connector' = 'print'
);
INSERT INTO sink_table
SELECT product, order_time, amount,
SUM(amount) OVER (
PARTITION BY product
ORDER BY order_time
ROWS BETWEEN 5 PRECEDING AND CURRENT ROW
) AS five_row_product_amount_sum
FROM source_table
Simplified Syntax for Multiple Over Windows
Flink SQL supports a simplified syntax when using multiple aggregation windows in a single SELECT statement:
SELECT order_id, order_time, amount,
SUM(amount) OVER w AS sum_amount,
AVG(amount) OVER w AS avg_amount
FROM Orders
WINDOW w AS (
PARTITION BY product
ORDER BY order_time
RANGE BETWEEN INTERVAL '1' HOUR PRECEDING AND CURRENT ROW)