Fading Coder

One Final Commit for the Last Sprint

Home > Notes > Content

Understanding Group and Over Aggregations in Flink SQL

Notes May 14 2

Understanding Group and Over Aggregations in Flink SQL

  1. Group Aggregation

1.1 Basic Concepts

Group aggregation in Flink (supported for both Batch and Streaming tasks) is a method of grouping data based on specific categories such as age, gender, or other dimensions. This differs from window aggregation, which groups data based on time intervals. While window aggregation operates vertically over time, group aggregation operates horizontally across data categories.

1.2 Window Aggregation vs. Group Aggregation

Application scenarios: Group aggregation is typically used to categorize data before applying aggregate functions like COUNT, SUM, etc.

One might wonder how window aggregation can be converted to group aggregation by simply using time as the GROUP BY key. What are the essential differences between these two approaches?

Consider this example of converting a window aggregation to group aggregation. For instance, a 1-minute window aggregation can be transformed as follows:

Window Aggregation Example (Rolling Window):

-- Source table
CREATE TABLE source_table (
    dimension_data STRING,
    user_id BIGINT,
    price BIGINT,
    event_time AS cast(CURRENT_TIMESTAMP as timestamp(3)),
    WATERMARK FOR event_time AS event_time - INTERVAL '5' SECOND
) WITH (
  'connector' = 'datagen',
  'rows-per-second' = '10',
  'fields.dimension_data.length' = '1',
  'fields.user_id.min' = '1',
  'fields.user_id.max' = '100000',
  'fields.price.min' = '1',
  'fields.price.max' = '100000'
)

-- Sink table
CREATE TABLE output_table (
    dimension_data STRING,
    page_views BIGINT,
    total_price BIGINT,
    max_price BIGINT,
    min_price BIGINT,
    unique_users BIGINT,
    window_start bigint
) WITH (
  'connector' = 'print'
)

-- Processing logic
insert into output_table
select dimension_data,
    count(*) as page_views,
    sum(price) as total_price,
    max(price) as max_price,
    min(price) as min_price,
    count(distinct user_id) as unique_users,
    UNIX_TIMESTAMP(CAST(tumble_start(event_time, interval '1' minute) AS STRING)) * 1000  as window_start
from source_table
group by
    dimension_data,
    tumble(event_time, interval '1' minute)

Converted Group Aggregation:

-- Source table
CREATE TABLE source_table (
    dimension_data STRING,
    user_id BIGINT,
    price BIGINT,
    event_time AS cast(CURRENT_TIMESTAMP as timestamp(3)),
    WATERMARK FOR event_time AS event_time - INTERVAL '5' SECOND
) WITH (
  'connector' = 'datagen',
  'rows-per-second' = '10',
  'fields.dimension_data.length' = '1',
  'fields.user_id.min' = '1',
  'fields.user_id.max' = '100000',
  'fields.price.min' = '1',
  'fields.price.max' = '100000'
);

-- Sink table
CREATE TABLE output_table (
    dimension_data STRING,
    page_views BIGINT,
    total_price BIGINT,
    max_price BIGINT,
    min_price BIGINT,
    unique_users BIGINT,
    window_start bigint
) WITH (
  'connector' = 'print'
);

-- Processing logic
insert into output_table
select dimension_data,
    count(*) as page_views,
    sum(price) as total_price,
    max(price) as max_price,
    min(price) as min_price,
    count(distinct user_id) as unique_users,
    cast((UNIX_TIMESTAMP(CAST(event_time AS STRING))) / 60 as bigint) as window_start
from source_table
group by
    dimension_data,
    cast((UNIX_TIMESTAMP(CAST(event_time AS STRING))) / 60 as bigint)

While this conversion is technically valid, the fundamental differences between window and group aggregation are:

  • Essential Difference: Window aggregation has time seamntics, meaning once a window is closed and results are emitted, subsequent late data won't alter those results (excluding allowLateness). Group aggregation lacks time semantics; any incoming data, regardless of how late, will trigger a retraction of previous results and emission of updated results.
  • Execution Level: Window aggregation is time-bound, with computation triggered by time (Watermark). Group aggregation is entirely data-driven, with each new record triggering computation and result emission.

1.3 SQL Semantics

In a streaming context with a Kafka source and target, the SQL generates three operators:

  • Source Operator: Continuously reads data from Kafka and sends it to the group aggregation operator. The shuffle strategy is based on GROUP BY keys, with identical keys routed to the same subtask.
  • Group Aggregation Operator: Receives records, checks state for previous results, computes new values, updates state, and emits results. For existing keys, it first retracts previous results before emitting new ones. For new keys, it directly emits the computed result.
  • Sink Operator: Receives results and writes them to the target Kafka table. All operators remain running continuously in a streaming environment.

1.4 Advanced Grouping Features

Group aggregation supports GROUPING SETS, ROLLUP, and CUBE. Here's an example using GROUPING SETS:

SELECT 
    supplier_id
    , rating
    , product_id
    , COUNT(*)
FROM (VALUES
    ('supplier1', 'product1', 4),
    ('supplier1', 'product2', 3),
    ('supplier2', 'product3', 3),
    ('supplier2', 'product4', 4))
AS Products(supplier_id, product_id, rating)
GROUP BY GROUPING SET (
    ( supplier_id, product_id, rating ),
    ( supplier_id, product_id         ),
    ( supplier_id,             rating ),
    ( supplier_id                     ),
    (              product_id, rating ),
    (              product_id         ),
    (                          rating ),
    (                                 )
)

  1. Over Aggregation

Over aggregation (supported for both Batch and Streaming) can be understood as a special type of sliding window aggregation function.

The key difference between Over aggregation and window aggregation is:

  • Window aggregation: Fields not in GROUP BY cannot be directly seelcted.
  • Over aggregation: Preserves original fields while computing aggregates.

Note: Over aggregation is used less frequently in production environments despite being available in systems like Hive.

Application Scenarios

Over aggregation is useful for calculating aggregate results over a sliding window of recent data. For example, calculating the sum of order amounts for each product over the last hour.

Practical Example

SELECT order_id, order_time, amount,
  SUM(amount) OVER (
    PARTITION BY product
    ORDER BY order_time
    RANGE BETWEEN INTERVAL '1' HOUR PRECEDING AND CURRENT ROW
  ) AS one_hour_product_amount_sum
FROM Orders

Over Aggregation Syntax

SELECT
  agg_func(agg_col) OVER (
    [PARTITION BY col1[, col2, ...]]
    ORDER BY time_col
    range_definition),
  ...
FROM ...

  • ORDER BY: Must be a timestamp column (event time or processing time).
  • PARTITION BY: Defines the aggregation granularity (e.g., by product).
  • range_definition: Specifies the aggregation window range, either by number of rows or time interval.

2.1 Time-Based Aggregation

Time-based aggregation creates a sliding time window. The following example calculates the sum of amounts over the last hour for each product:

CREATE TABLE source_table (
    order_id BIGINT,
    product BIGINT,
    amount BIGINT,
    order_time as cast(CURRENT_TIMESTAMP as TIMESTAMP(3)),
    WATERMARK FOR order_time AS order_time - INTERVAL '0.001' SECOND
) WITH (
  'connector' = 'datagen',
  'rows-per-second' = '1',
  'fields.order_id.min' = '1',
  'fields.order_id.max' = '2',
  'fields.amount.min' = '1',
  'fields.amount.max' = '10',
  'fields.product.min' = '1',
  'fields.product.max' = '2'
);

CREATE TABLE sink_table (
    product BIGINT,
    order_time TIMESTAMP(3),
    amount BIGINT,
    one_hour_product_amount_sum BIGINT
) WITH (
  'connector' = 'print'
);

INSERT INTO sink_table
SELECT product, order_time, amount,
  SUM(amount) OVER (
    PARTITION BY product
    ORDER BY order_time
    RANGE BETWEEN INTERVAL '1' HOUR PRECEDING AND CURRENT ROW
  ) AS one_hour_product_amount_sum
FROM source_table

2.2 Row-Based Aggregation

Row-based aggregation creates a sliding window of a fixed number of rows. The following example calculates the sum of amounts over the last 5 rows for each product:

CREATE TABLE source_table (
    order_id BIGINT,
    product BIGINT,
    amount BIGINT,
    order_time as cast(CURRENT_TIMESTAMP as TIMESTAMP(3)),
    WATERMARK FOR order_time AS order_time - INTERVAL '0.001' SECOND
) WITH (
  'connector' = 'datagen',
  'rows-per-second' = '1',
  'fields.order_id.min' = '1',
  'fields.order_id.max' = '2',
  'fields.amount.min' = '1',
  'fields.amount.max' = '2',
  'fields.product.min' = '1',
  'fields.product.max' = '2'
);

CREATE TABLE sink_table (
    product BIGINT,
    order_time TIMESTAMP(3),
    amount BIGINT,
    five_row_product_amount_sum BIGINT
) WITH (
  'connector' = 'print'
);

INSERT INTO sink_table
SELECT product, order_time, amount,
  SUM(amount) OVER (
    PARTITION BY product
    ORDER BY order_time
    ROWS BETWEEN 5 PRECEDING AND CURRENT ROW
  ) AS five_row_product_amount_sum
FROM source_table

Simplified Syntax for Multiple Over Windows

Flink SQL supports a simplified syntax when using multiple aggregation windows in a single SELECT statement:

SELECT order_id, order_time, amount,
  SUM(amount) OVER w AS sum_amount,
  AVG(amount) OVER w AS avg_amount
FROM Orders
WINDOW w AS (
  PARTITION BY product
  ORDER BY order_time
  RANGE BETWEEN INTERVAL '1' HOUR PRECEDING AND CURRENT ROW)

Tags: Flink SQL

Related Articles

Designing Alertmanager Templates for Prometheus Notifications

How to craft Alertmanager templates to format alert messages, improving clarity and presentation. Alertmanager uses Go’s text/template engine with additional helper functions. Alerting rules referenc...

Deploying a Maven Web Application to Tomcat 9 Using the Tomcat Manager

Tomcat 9 does not provide a dedicated Maven plugin. The Tomcat Manager interface, however, is backward-compatible, so the Tomcat 7 Maven Plugin can be used to deploy to Tomcat 9. This guide shows two...

Skipping Errors in MySQL Asynchronous Replication

When a replica halts because the SQL thread encounters an error, you can resume replication by skipping the problematic event(s). Two common approaches are available. Methods to Skip Errors 1) Skip a...

Leave a Comment

Anonymous

◎Feel free to join the discussion and share your thoughts.