Fading Coder

One Final Commit for the Last Sprint

Home > Notes > Content

Apache Flink Batch Processing: Word Count Implementation

Notes 1

Core Processing Workflow

The standard approach for batch processing word counts involves five distinct stages:

  1. Read input text data line by line.
  2. Tokenize each line into individual words.
  3. Group entries by the word key.
  4. Aggregate the frequency of each group.
  5. Output the final word and count pairs.

Project Configuration

Dependency Setup

Initialize a Maven project named FlinkBatchDemo. The POM file must include the core Flink libraries required for Java streaming and client execution. Ensure the version matches your runtime environment (e.g., 1.17.0).

<properties>
    <flink.version>1.17.0</flink.version>
</properties>

<dependencies>
    <dependency>
        <groupId>org.apache.flink</groupId>
        <artifactId>flink-streaming-java</artifactId>
        <version>${flink.version}</version>
    </dependency>
    
    <dependency>
        <groupId>org.apache.flink</groupId>
        <artifactId>flink-clients</artifactId>
        <version>${flink.version}</version>
    </dependency>
</dependencies>

Input Data Preparation

Create a resource file within the project structure to hold test input. A plain textt file named input.txt should reside in the root data directory containing sample strings such as:

hello flink
hello world
hello java

Code Implementation

Create a Java class under a designated package (e.g., com.example.flink.batch) to execute the logic. The implementation utilizes the legacy DataSet API (ExecutionEnvironment) which is specific to batch mode processing.

package com.example.flink.batch;

import org.apache.flink.api.common.functions.FlatMapFunction;
import org.apache.flink.api.java.ExecutionEnvironment;
import org.apache.flink.api.java.tuple.Tuple2;
import org.apache.flink.util.Collector;

public class BatchWordCountApplication {

    public static void main(String[] args) throws Exception {
        // Initialize the execution environment
        ExecutionEnvironment env = ExecutionEnvironment.getExecutionEnvironment();
        env.setParallelism(2);

        // Load text file as a DataSource
        String filePath = "data/input.txt";
        var source = env.readTextFile(filePath);

        // Apply transformation pipeline
        source
            .flatMap(new FlatMapFunction<String, Tuple2<String, Integer>>() {
                @Override
                public void flatMap(String line, Collector<Tuple2<String, Integer>> out) {
                    // Split line by whitespace
                    String[] tokens = line.split("\\s+");
                    for (String token : tokens) {
                        if (!token.isEmpty()) {
                            out.collect(Tuple2.of(token, 1));
                        }
                    }
                }
            })
            // Group by the first column (the word)
            .groupBy(0)
            // Sum the second column (the count)
            .sum(1)
            // Print results to console
            .print();
    }
}

Expected Console Output

Upon successful execution, the job aggregates the occurrences of words from the input file:

(flink,1)
(world,1)
(java,1)
(hello,3)

Troubleshoooting Logging Errors

Common runtime warnings regarding logging frameworks often occur due to missing or misconfigured bindings.

SLF4J Warning

If you encounter errors stating Failed to load class "org.slf4j.impl.StaticLoggerBinder", add the following dependencies to resolve the logger binding issue:

<dependency>
    <groupId>log4j</groupId>
    <artifactId>log4j</artifactId>
    <version>1.2.17</version>
</dependency>
<dependency>
    <groupId>org.slf4j</groupId>
    <artifactId>slf4j-log4j12</artifactId>
    <version>1.7.32</version>
</dependency>

Log4j Configuration Missing

Warnings indicating that appenders cannot be found require a dedicated configuration file. Create log4j.properties inside the src/main/resources directory with the following content:

log4j.rootLogger=warn,CONSOLE,File

# Console Appender
log4j.appender.CONSOLE=org.apache.log4j.ConsoleAppender
log4j.appender.CONSOLE.layout=org.apache.log4j.PatternLayout
log4j.appender.CONSOLE.layout.ConversionPattern=%d{yyyy-MM-dd HH:mm:ss SSS} [%t] [%c] [%p] - %m%n

# File Appender
log4j.appender.File=org.apache.log4j.DailyRollingFileAppender
log4j.appender.File.layout=org.apache.log4j.PatternLayout
log4j.appender.File.layout.ConversionPattern=%d{yyyy-MM-dd HH:mm:ss SSS} [%t] [%c] [%p] - %m%n
log4j.appender.File.datePattern='.'yyyy-MM-dd
log4j.appender.File.File=./logs/fink-job.log
log4j.appender.File.Threshold = info
log4j.appender.File.append=true

Ensure the path for the file appender exists before running the application.

Related Articles

Designing Alertmanager Templates for Prometheus Notifications

How to craft Alertmanager templates to format alert messages, improving clarity and presentation. Alertmanager uses Go’s text/template engine with additional helper functions. Alerting rules referenc...

Deploying a Maven Web Application to Tomcat 9 Using the Tomcat Manager

Tomcat 9 does not provide a dedicated Maven plugin. The Tomcat Manager interface, however, is backward-compatible, so the Tomcat 7 Maven Plugin can be used to deploy to Tomcat 9. This guide shows two...

Skipping Errors in MySQL Asynchronous Replication

When a replica halts because the SQL thread encounters an error, you can resume replication by skipping the problematic event(s). Two common approaches are available. Methods to Skip Errors 1) Skip a...

Leave a Comment

Anonymous

◎Feel free to join the discussion and share your thoughts.