Fading Coder

One Final Commit for the Last Sprint

Home > Tech > Content

Optimizing Apache Flink for Large-Scale Stream Processing at Kuaishou

Tech May 13 2

Evolution of Stream Processing Systems

Stream processing focuses on handling unbounded data streams in real time, delivering results with minimal latency. To understand Flink's role, we trace the evolution of data systems:

  • Early Batch Era: Google's MapReduce (2003) and Apache Hadoop popularized large-scale batch processing.
  • Pipeline Improvements: Google Flume introduced pipeline-based processing to connect multiple MapReduce jobs efficiently.
  • Early Streaming: Apache Storm (2011) offered low latency but lacked strong consistency guarantees during failover.
  • Lambda Architecture: Combined fast but imprecise stream processing with accurate batch layers. It was powerful but complex to maintain.
  • Micro-Batch Era: Spark Streaming treated streams as small batches (micro-batches), providing production-grade consistency.
  • Messaging Foundations: Apache Kafka emerged as a distributed log, enabling data replay and serving as the backbone for modern stream architectures.
  • Modern Streaming: Apache Flink matured around 2016, adopting concepts from Google Dataflow (like event time and watermarks) to provide true stateful stream processing with exactly-once semantics.

Batch vs. Stream Processing

Core Challenges in Streaming

Designing a stream processor requires solving four dimensions, often referred to as the "4 Ws":

  1. What: What computation is being performed? (e.g., Map, Aggregate, Join).
  2. Where: Where in event time is the data scoped? Typically handled via Windows (Tumbling, Sliding, Session).
  3. When: When are results materialized? Controlled by Watermarks (for event time progression) and Triggers.
  4. How: How do results relate to previous ones?
    • Discarding: Ignore previous state.
    • Accumulating: Update state incrementally.
    • Accumulating & Retracting: Emit an "undo" (retraction) for the old result and a new "insert" for the updated result.

Key Flink Technologies

Distributed State & Exactly-Once

Flink manages state (e.g., sums, counts, windows) within the operator instances. Its core capability is guaranteeing Exactly-Once semantics, meaning an event affects the state exactly once, even if failures occur. This relies on:

  • State Backends: Storing state in memory (HashMapStateBackend) or on disk (RocksDBStateBackend).
  • Event Time: Processing based on when events actually occurred, rather than when they are processed.

Checkpointing Mechanism

Flink uses a lightweight snapshotting mechanism called Checkpointing. It uses a special record called a Barrier that flows with the data stream.

  1. A barrier is injected at the Source (e.g., Kafka offset 100).
  2. Operators receive the barrier, snapshot their current state (e.g., currentSum = 500), and send the barrier downstream.
  3. Once the barrier reaches the Sink, the checkpoint is acknowledged.

If a failure occurs, Flink restores the state from the last successful checkpoint. The Source resets to offset 100, and the operator resets currentSum to 500, ensuring no data is lost or duplicated.

Event Time and Watermarks

In the real world, events arrive out of order. Flink uses Watermarks to track event time progress. A watermark with value T implies that no events older than T will arrive. When a watermark passes the end of a window, the window is triggered for computation.

Late events arriving after the watermark can still be handled via Late Data policies, allowing the window to re-trigger and update results (Retractions).

Window Types

  • Tumbling Windows: Fixed-size, non-overlapping (e.g., 5-second buckets).
  • Sliding Windows: Fixed-size but overlapping (e.g., 10-second window sliding every 5 seconds).
  • Session Windows: Groups events by periods of activity separated by gaps of inactivity.

Kuaishou Production Practices

Real-Time UV (Unique Visitor) Calculation

Calculating UV (unique device IDs) is challenging due to massive scale (millions of QPS) and strict latency requirements. Kuaishou optimized this using a custom dictionary and state management.

1. Global Dictionary for ID Mapping

Storing raw string IDs (like UUIDs) in Flink state is memory-intensive. Kuaishou built a distributed dictionary inside Flink to map strings to compact Long integers.

Logic:

  1. Partitioning: Hash the device ID to a specific partition index.
  2. ID Generation: Each partition maintains a counter. When a new ID arrives, it assigns PartitionID * Offset + Counter++.
  3. State Storage: Use KeyedState to store the mapping deviceId -> Long. This allows Flink to rescale (change parallelism) easily.

2. Handling Data Skew

Certain keys (e.g., celebrity accounts) generate massive traffic, causing "hot partitions."

Solution: Local Aggregation & Scatter

Instead of sending all data to one node:

  1. Add a random suffix to the key (e.g., user_123_01, user_123_02).
  2. Perform a local aggregation (sum) on these scattered keys.
  3. Remove the suffix and perform a final global aggregation.

3. Incremental UV with Bitmaps

To calculate daily UV updated every minute without storing all raw IDs:

  • Use a Global State (cross-window) holding a Bitmap.
  • Define a 1-minute rolling window just to trigger computasion.
  • Every minute, add the IDs from the last minute to the Bitmap and output bitmap.cardinality().
  • At the end of the day (e.g., 24 hours), clear the state.

Accelerating Failover Recovery

For high SLA requirements (99.99% uptime), recovery speed is critical. Kuaishou implemented strategies to reduce recovery time from minutes to seconds.

1. Container Crash Recovery

Normally, requesting a new YARN container takes 30-60 seconds. Kuaishou optimized this by:

  • Redundant Containers: Keeping "standby" TaskManagers idle in the cluster. If a worker dies, a standby immediately takes over the state recovery.
  • Proactive Allocation: Upon detecting a failure, immediately requesting a new container in the background while the standby takes over.

This reduced recovery time to under 20 seconds.

2. Machine Failure Recovery

If a physical machine dies, YARN might take >3 minutes to detect and reschedule. Kuaishou's approach:

  • Monitoring (Hawk): Custom health checks detect node failure within 5 seconds.
  • Strategic Placement: Ensuring redundant containers are placed on different physical hosts than the active ones.
  • Result: Recovery time reduced to under 30 seconds.

Sample Code Examples

Java: Incremental UV Calculation


// Simplified logic for Global State Bitmap UV
public class IncrementalUvCalculator extends KeyedProcessFunction<String, DeviceEvent, Long> {
    private transient ValueState<RoaringBitmap> uvState;
    private transient ValueState<Long> lastUpdateTime;

    @Override
    public void processElement(DeviceEvent event, Context ctx, Collector<Long> out) throws Exception {
        RoaringBitmap bitmap = uvState.value();
        if (bitmap == null) {
            bitmap = new RoaringBitmap();
        }
        
        // Add the mapped Long ID to bitmap
        bitmap.add(event.getMappedId());
        uvState.update(bitmap);

        // Trigger output based on timer (every minute)
        long currentMinute = ctx.timerService().currentProcessingTime() / 60000 * 60000;
        if (lastUpdateTime.value() == null || lastUpdateTime.value() < currentMinute) {
            out.collect(bitmap.getCardinality());
            lastUpdateTime.update(currentMinute);
        }
    }
}

SQL: Early Firing Window


-- Configuration to fire every 60 seconds
SET 'table.exec.emit.early-fire.enabled' = 'true';
SET 'table.exec.emit.early-fire.delay' = '60s';

-- Query to count distinct users daily, outputting updates every minute
SELECT 
    TUMBLE_ROWTIME(event_ts, INTERVAL '1' DAY) as end_of_day,
    dimension,
    COUNT(DISTINCT mapped_id) as uv_count
FROM user_activity
GROUP BY 
    TUMBLE(event_ts, INTERVAL '1' DAY), 
    dimension;

Related Articles

Understanding Strong and Weak References in Java

Strong References Strong reference are the most prevalent type of object referencing in Java. When an object has a strong reference pointing to it, the garbage collector will not reclaim its memory. F...

Comprehensive Guide to SSTI Explained with Payload Bypass Techniques

Introduction Server-Side Template Injection (SSTI) is a vulnerability in web applications where user input is improper handled within the template engine and executed on the server. This exploit can r...

Implement Image Upload Functionality for Django Integrated TinyMCE Editor

Django’s Admin panel is highly user-friendly, and pairing it with TinyMCE, an effective rich text editor, simplifies content management significantly. Combining the two is particular useful for bloggi...

Leave a Comment

Anonymous

◎Feel free to join the discussion and share your thoughts.