Optimizing Apache Flink for Large-Scale Stream Processing at Kuaishou
Evolution of Stream Processing Systems
Stream processing focuses on handling unbounded data streams in real time, delivering results with minimal latency. To understand Flink's role, we trace the evolution of data systems:
- Early Batch Era: Google's MapReduce (2003) and Apache Hadoop popularized large-scale batch processing.
- Pipeline Improvements: Google Flume introduced pipeline-based processing to connect multiple MapReduce jobs efficiently.
- Early Streaming: Apache Storm (2011) offered low latency but lacked strong consistency guarantees during failover.
- Lambda Architecture: Combined fast but imprecise stream processing with accurate batch layers. It was powerful but complex to maintain.
- Micro-Batch Era: Spark Streaming treated streams as small batches (micro-batches), providing production-grade consistency.
- Messaging Foundations: Apache Kafka emerged as a distributed log, enabling data replay and serving as the backbone for modern stream architectures.
- Modern Streaming: Apache Flink matured around 2016, adopting concepts from Google Dataflow (like event time and watermarks) to provide true stateful stream processing with exactly-once semantics.
Batch vs. Stream Processing
Core Challenges in Streaming
Designing a stream processor requires solving four dimensions, often referred to as the "4 Ws":
- What: What computation is being performed? (e.g., Map, Aggregate, Join).
- Where: Where in event time is the data scoped? Typically handled via Windows (Tumbling, Sliding, Session).
- When: When are results materialized? Controlled by Watermarks (for event time progression) and Triggers.
- How: How do results relate to previous ones?
- Discarding: Ignore previous state.
- Accumulating: Update state incrementally.
- Accumulating & Retracting: Emit an "undo" (retraction) for the old result and a new "insert" for the updated result.
Key Flink Technologies
Distributed State & Exactly-Once
Flink manages state (e.g., sums, counts, windows) within the operator instances. Its core capability is guaranteeing Exactly-Once semantics, meaning an event affects the state exactly once, even if failures occur. This relies on:
- State Backends: Storing state in memory (HashMapStateBackend) or on disk (RocksDBStateBackend).
- Event Time: Processing based on when events actually occurred, rather than when they are processed.
Checkpointing Mechanism
Flink uses a lightweight snapshotting mechanism called Checkpointing. It uses a special record called a Barrier that flows with the data stream.
- A barrier is injected at the Source (e.g., Kafka offset 100).
- Operators receive the barrier, snapshot their current state (e.g.,
currentSum = 500), and send the barrier downstream. - Once the barrier reaches the Sink, the checkpoint is acknowledged.
If a failure occurs, Flink restores the state from the last successful checkpoint. The Source resets to offset 100, and the operator resets currentSum to 500, ensuring no data is lost or duplicated.
Event Time and Watermarks
In the real world, events arrive out of order. Flink uses Watermarks to track event time progress. A watermark with value T implies that no events older than T will arrive. When a watermark passes the end of a window, the window is triggered for computation.
Late events arriving after the watermark can still be handled via Late Data policies, allowing the window to re-trigger and update results (Retractions).
Window Types
- Tumbling Windows: Fixed-size, non-overlapping (e.g., 5-second buckets).
- Sliding Windows: Fixed-size but overlapping (e.g., 10-second window sliding every 5 seconds).
- Session Windows: Groups events by periods of activity separated by gaps of inactivity.
Kuaishou Production Practices
Real-Time UV (Unique Visitor) Calculation
Calculating UV (unique device IDs) is challenging due to massive scale (millions of QPS) and strict latency requirements. Kuaishou optimized this using a custom dictionary and state management.
1. Global Dictionary for ID Mapping
Storing raw string IDs (like UUIDs) in Flink state is memory-intensive. Kuaishou built a distributed dictionary inside Flink to map strings to compact Long integers.
Logic:
- Partitioning: Hash the device ID to a specific partition index.
- ID Generation: Each partition maintains a counter. When a new ID arrives, it assigns
PartitionID * Offset + Counter++. - State Storage: Use
KeyedStateto store the mappingdeviceId -> Long. This allows Flink to rescale (change parallelism) easily.
2. Handling Data Skew
Certain keys (e.g., celebrity accounts) generate massive traffic, causing "hot partitions."
Solution: Local Aggregation & Scatter
Instead of sending all data to one node:
- Add a random suffix to the key (e.g.,
user_123_01,user_123_02). - Perform a local aggregation (sum) on these scattered keys.
- Remove the suffix and perform a final global aggregation.
3. Incremental UV with Bitmaps
To calculate daily UV updated every minute without storing all raw IDs:
- Use a Global State (cross-window) holding a Bitmap.
- Define a 1-minute rolling window just to trigger computasion.
- Every minute, add the IDs from the last minute to the Bitmap and output
bitmap.cardinality(). - At the end of the day (e.g., 24 hours), clear the state.
Accelerating Failover Recovery
For high SLA requirements (99.99% uptime), recovery speed is critical. Kuaishou implemented strategies to reduce recovery time from minutes to seconds.
1. Container Crash Recovery
Normally, requesting a new YARN container takes 30-60 seconds. Kuaishou optimized this by:
- Redundant Containers: Keeping "standby" TaskManagers idle in the cluster. If a worker dies, a standby immediately takes over the state recovery.
- Proactive Allocation: Upon detecting a failure, immediately requesting a new container in the background while the standby takes over.
This reduced recovery time to under 20 seconds.
2. Machine Failure Recovery
If a physical machine dies, YARN might take >3 minutes to detect and reschedule. Kuaishou's approach:
- Monitoring (Hawk): Custom health checks detect node failure within 5 seconds.
- Strategic Placement: Ensuring redundant containers are placed on different physical hosts than the active ones.
- Result: Recovery time reduced to under 30 seconds.
Sample Code Examples
Java: Incremental UV Calculation
// Simplified logic for Global State Bitmap UV
public class IncrementalUvCalculator extends KeyedProcessFunction<String, DeviceEvent, Long> {
private transient ValueState<RoaringBitmap> uvState;
private transient ValueState<Long> lastUpdateTime;
@Override
public void processElement(DeviceEvent event, Context ctx, Collector<Long> out) throws Exception {
RoaringBitmap bitmap = uvState.value();
if (bitmap == null) {
bitmap = new RoaringBitmap();
}
// Add the mapped Long ID to bitmap
bitmap.add(event.getMappedId());
uvState.update(bitmap);
// Trigger output based on timer (every minute)
long currentMinute = ctx.timerService().currentProcessingTime() / 60000 * 60000;
if (lastUpdateTime.value() == null || lastUpdateTime.value() < currentMinute) {
out.collect(bitmap.getCardinality());
lastUpdateTime.update(currentMinute);
}
}
}
SQL: Early Firing Window
-- Configuration to fire every 60 seconds
SET 'table.exec.emit.early-fire.enabled' = 'true';
SET 'table.exec.emit.early-fire.delay' = '60s';
-- Query to count distinct users daily, outputting updates every minute
SELECT
TUMBLE_ROWTIME(event_ts, INTERVAL '1' DAY) as end_of_day,
dimension,
COUNT(DISTINCT mapped_id) as uv_count
FROM user_activity
GROUP BY
TUMBLE(event_ts, INTERVAL '1' DAY),
dimension;