Apache Flink Batch Processing: Word Count Implementation
Core Processing Workflow
The standard approach for batch processing word counts involves five distinct stages:
- Read input text data line by line.
- Tokenize each line into individual words.
- Group entries by the word key.
- Aggregate the frequency of each group.
- Output the final word and count pairs.
Project Configuration
Dependency Setup
Initialize a Maven project named FlinkBatchDemo. The POM file must include the core Flink libraries required for Java streaming and client execution. Ensure the version matches your runtime environment (e.g., 1.17.0).
<properties>
<flink.version>1.17.0</flink.version>
</properties>
<dependencies>
<dependency>
<groupId>org.apache.flink</groupId>
<artifactId>flink-streaming-java</artifactId>
<version>${flink.version}</version>
</dependency>
<dependency>
<groupId>org.apache.flink</groupId>
<artifactId>flink-clients</artifactId>
<version>${flink.version}</version>
</dependency>
</dependencies>
Input Data Preparation
Create a resource file within the project structure to hold test input. A plain textt file named input.txt should reside in the root data directory containing sample strings such as:
hello flink
hello world
hello java
Code Implementation
Create a Java class under a designated package (e.g., com.example.flink.batch) to execute the logic. The implementation utilizes the legacy DataSet API (ExecutionEnvironment) which is specific to batch mode processing.
package com.example.flink.batch;
import org.apache.flink.api.common.functions.FlatMapFunction;
import org.apache.flink.api.java.ExecutionEnvironment;
import org.apache.flink.api.java.tuple.Tuple2;
import org.apache.flink.util.Collector;
public class BatchWordCountApplication {
public static void main(String[] args) throws Exception {
// Initialize the execution environment
ExecutionEnvironment env = ExecutionEnvironment.getExecutionEnvironment();
env.setParallelism(2);
// Load text file as a DataSource
String filePath = "data/input.txt";
var source = env.readTextFile(filePath);
// Apply transformation pipeline
source
.flatMap(new FlatMapFunction<String, Tuple2<String, Integer>>() {
@Override
public void flatMap(String line, Collector<Tuple2<String, Integer>> out) {
// Split line by whitespace
String[] tokens = line.split("\\s+");
for (String token : tokens) {
if (!token.isEmpty()) {
out.collect(Tuple2.of(token, 1));
}
}
}
})
// Group by the first column (the word)
.groupBy(0)
// Sum the second column (the count)
.sum(1)
// Print results to console
.print();
}
}
Expected Console Output
Upon successful execution, the job aggregates the occurrences of words from the input file:
(flink,1)
(world,1)
(java,1)
(hello,3)
Troubleshoooting Logging Errors
Common runtime warnings regarding logging frameworks often occur due to missing or misconfigured bindings.
SLF4J Warning
If you encounter errors stating Failed to load class "org.slf4j.impl.StaticLoggerBinder", add the following dependencies to resolve the logger binding issue:
<dependency>
<groupId>log4j</groupId>
<artifactId>log4j</artifactId>
<version>1.2.17</version>
</dependency>
<dependency>
<groupId>org.slf4j</groupId>
<artifactId>slf4j-log4j12</artifactId>
<version>1.7.32</version>
</dependency>
Log4j Configuration Missing
Warnings indicating that appenders cannot be found require a dedicated configuration file. Create log4j.properties inside the src/main/resources directory with the following content:
log4j.rootLogger=warn,CONSOLE,File
# Console Appender
log4j.appender.CONSOLE=org.apache.log4j.ConsoleAppender
log4j.appender.CONSOLE.layout=org.apache.log4j.PatternLayout
log4j.appender.CONSOLE.layout.ConversionPattern=%d{yyyy-MM-dd HH:mm:ss SSS} [%t] [%c] [%p] - %m%n
# File Appender
log4j.appender.File=org.apache.log4j.DailyRollingFileAppender
log4j.appender.File.layout=org.apache.log4j.PatternLayout
log4j.appender.File.layout.ConversionPattern=%d{yyyy-MM-dd HH:mm:ss SSS} [%t] [%c] [%p] - %m%n
log4j.appender.File.datePattern='.'yyyy-MM-dd
log4j.appender.File.File=./logs/fink-job.log
log4j.appender.File.Threshold = info
log4j.appender.File.append=true
Ensure the path for the file appender exists before running the application.